diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java index 9f29cb6..01b6c20 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java @@ -22,30 +22,42 @@ import java.util.Map; */ @BaseRequest(baseURL = "http://service-info-query/info") public interface InfoService { - @Get("/table_total") - Long tableTotal(); + @Get("/table_count") + Long tableCount(); - @Get("/hudi_total") - Long hudiTotal(); + @Get("/table_focus_count") + Long tableFocusCount(); - @Get("/focus_count") - Long focusCount(); + @Get("/hudi_count") + Long hudiCount(); - @Get("/normal_count") - Long normalCount(); + @Get("/hudi_focus_count") + Long hudiFocusCount(); - @Get("/un_scheduled_normal_table_count") - Long unScheduledNormalTableCount(@Query("version") String version); - - @Get("/un_scheduled_focus_table_count") - Long unScheduledFocusTableCount(@Query("version") String version); + @Get("/un_receive_version_normal_table") + ImmutableList unReceiveVersionNormalTable(@Query("version") String version); @Get("/un_receive_version_normal_table_count") Long unReceiveVersionNormalTableCount(@Query("version") String version); + @Get("/un_receive_version_focus_table") + ImmutableList unReceiveVersionFocusTable(@Query("version") String version); + @Get("/un_receive_version_focus_table_count") Long unReceiveVersionFocusTableCount(@Query("version") String version); + @Get("/un_scheduled_normal_table") + ImmutableList unScheduledNormalTable(@Query("version") String version); + + @Get("/un_scheduled_normal_table_count") + Long unScheduledNormalTableCount(@Query("version") String version); + + @Get("/un_scheduled_focus_table") + ImmutableList unScheduledFocusTable(@Query("version") String version); + + @Get("/un_scheduled_focus_table_count") + Long unScheduledFocusTableCount(@Query("version") String version); + @Get("/job_id_alias") PageResponse jobIdAndAlias(@Query Map queryMap); diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java index 60dfcd6..3444938 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java @@ -37,26 +37,6 @@ public class InfoController { this.infoService = infoService; } - @GetMapping("table_total") - public Long tableTotal() { - return infoService.tableTotal(); - } - - @GetMapping("hudi_total") - public Long hudiTotal() { - return infoService.hudiTotal(); - } - - @GetMapping("focus_count") - public Long focusCount() { - return infoService.focusCount(); - } - - @GetMapping("normal_count") - public Long normalCount() { - return infoService.normalCount(); - } - @GetMapping("/job_id_alias") public PageResponse jobIdAndAlias( @RequestParam(value = "page", defaultValue = "1") Integer page, @@ -145,14 +125,9 @@ public class InfoController { return infoService.nonUpdatedVersionTables(); } - @GetMapping("/un_scheduled_normal_table_count") - public Long unScheduledNormalTableCount(@RequestParam("version") String version) { - return infoService.unScheduledNormalTableCount(version); - } - - @GetMapping("/un_scheduled_focus_table_count") - public Long unScheduledFocusTableCount(@RequestParam("version") String version) { - return infoService.unScheduledFocusTableCount(version); + @GetMapping("/un_receive_version_normal_table") + public ImmutableList unReceiveVersionNormalTable(@RequestParam("version") String version) { + return infoService.unReceiveVersionNormalTable(version); } @GetMapping("/un_receive_version_normal_table_count") @@ -160,8 +135,53 @@ public class InfoController { return infoService.unReceiveVersionNormalTableCount(version); } + @GetMapping("/un_receive_version_focus_table") + public ImmutableList unReceiveVersionFocusTable(@RequestParam("version") String version) { + return infoService.unReceiveVersionFocusTable(version); + } + @GetMapping("/un_receive_version_focus_table_count") public Long unReceiveVersionFocusTableCount(@RequestParam("version") String version) { return infoService.unReceiveVersionFocusTableCount(version); } + + @GetMapping("/un_scheduled_normal_table") + public ImmutableList unScheduledNormalTable(@RequestParam("version") String version) { + return infoService.unScheduledNormalTable(version); + } + + @GetMapping("/un_scheduled_normal_table_count") + public Long unScheduledNormalTableCount(@RequestParam("version") String version) { + return infoService.unScheduledNormalTableCount(version); + } + + @GetMapping("/un_scheduled_focus_table") + public ImmutableList unScheduledFocusTable(@RequestParam("version") String version) { + return infoService.unScheduledFocusTable(version); + } + + @GetMapping("/un_scheduled_focus_table_count") + public Long unScheduledFocusTableCount(@RequestParam("version") String version) { + return infoService.unScheduledFocusTableCount(version); + } + + @GetMapping("/table_count") + public Long tableCount() { + return infoService.tableCount(); + } + + @GetMapping("/table_focus_count") + public Long tableFocusCount() { + return infoService.tableFocusCount(); + } + + @GetMapping("/hudi_count") + public Long hudiCount() { + return infoService.hudiCount(); + } + + @GetMapping("/hudi_focus_count") + public Long hudiFocusCount() { + return infoService.hudiFocusCount(); + } } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index 58191b7..59bc3ae 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -343,119 +343,189 @@ public class InfoService { }); } + private SqlBuilder generateUnReceiveVersionNormalTableSql(SelectSqlBuilder builder, String version) { + return builder + .from(TABLE_INFO) + .whereLt(TABLE_INFO_PRIORITY, 10000) + .andEq(TABLE_INFO_STATUS, "y") + .andNotIn( + StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), + SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) + .from(TABLE_VERSION) + .whereEq(TABLE_VERSION_VERSION, version) + ); + } + + @Cacheable(value = "un-receive-version-normal-table", sync = true) + @Retryable(Throwable.class) + public ImmutableList unReceiveVersionNormalTable(String version) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + generateUnReceiveVersionNormalTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) + .build(), + (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) + ) + ); + } + @Cacheable(value = "un-receive-version-normal-table-count", sync = true) @Retryable(Throwable.class) public Long unReceiveVersionNormalTableCount(String version) { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(COUNT) - .from(TABLE_INFO) - .whereLt(TABLE_INFO_PRIORITY, 10000) - .andEq(TABLE_INFO_STATUS, "y") - .andNotIn( - StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), - SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) - .from(TABLE_VERSION) - .whereEq(TABLE_VERSION_VERSION, version) - ) + generateUnReceiveVersionNormalTableSql(SqlBuilder.select(COUNT), version) .build(), Long.class ); } + private SqlBuilder generateUnReceiveVersionFocusTable(SelectSqlBuilder builder, String version) { + return builder + .from(TABLE_INFO) + .whereGe(TABLE_INFO_PRIORITY, 10000) + .andEq(TABLE_INFO_STATUS, "y") + .andNotIn( + StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), + SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) + .from(TABLE_VERSION) + .whereEq(TABLE_VERSION_VERSION, version) + ); + } + + @Cacheable(value = "un-receive-version-focus-table", sync = true) + @Retryable(Throwable.class) + public ImmutableList unReceiveVersionFocusTable(String version) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + generateUnReceiveVersionFocusTable(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) + .build(), + (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) + ) + ); + } + @Cacheable(value = "un-receive-version-focus-table-count", sync = true) @Retryable(Throwable.class) public Long unReceiveVersionFocusTableCount(String version) { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(COUNT) - .from(TABLE_INFO) - .whereGe(TABLE_INFO_PRIORITY, 10000) - .andEq(TABLE_INFO_STATUS, "y") - .andNotIn( - StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), - SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) - .from(TABLE_VERSION) - .whereEq(TABLE_VERSION_VERSION, version) - ) + generateUnReceiveVersionFocusTable(SqlBuilder.select(COUNT), version) .build(), Long.class ); } + private SqlBuilder generateUnScheduledNormalTableSql(SelectSqlBuilder builder, String version) { + return builder + .from(TABLE_INFO) + .join(TABLE_VERSION) + .onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) + .andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) + .whereLt(TABLE_INFO_PRIORITY, 10000) + .andEq(TABLE_VERSION_SCHEDULED, false) + .andEq(TABLE_VERSION_VERSION, version) + .andEq(TABLE_INFO_STATUS, "y"); + } + + @Cacheable(value = "un-scheduled-normal-table", sync = true) + @Retryable(Throwable.class) + public ImmutableList unScheduledNormalTable(String version) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + generateUnScheduledNormalTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) + .build(), + (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) + ) + ); + } + @Cacheable(value = "un_scheduled_normal_table_count", sync = true) @Retryable(Throwable.class) public Long unScheduledNormalTableCount(String version) { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(COUNT) - .from(TABLE_INFO) - .join(TABLE_VERSION) - .onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) - .andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) - .whereLt(TABLE_INFO_PRIORITY, 10000) - .andEq(TABLE_VERSION_SCHEDULED, false) - .andEq(TABLE_VERSION_VERSION, version) - .andEq(TABLE_INFO_STATUS, "y") + generateUnScheduledNormalTableSql(SqlBuilder.select(COUNT), version) .build(), Long.class ); } + private SqlBuilder generateUnScheduledFocusTableSql(SelectSqlBuilder builder, String version) { + return builder + .from(TABLE_INFO) + .join(TABLE_VERSION) + .onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) + .andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) + .whereGe(TABLE_INFO_PRIORITY, 10000) + .andEq(TABLE_VERSION_SCHEDULED, false) + .andEq(TABLE_VERSION_VERSION, version) + .andEq(TABLE_INFO_STATUS, "y"); + } + + @Cacheable(value = "un-scheduled-focus-table", sync = true) + @Retryable(Throwable.class) + public ImmutableList unScheduledFocusTable(String version) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + generateUnScheduledFocusTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) + .build(), + (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) + ) + ); + } + @Cacheable(value = "un_scheduled_focus_table_count", sync = true) @Retryable(Throwable.class) public Long unScheduledFocusTableCount(String version) { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(COUNT) - .from(TABLE_INFO) - .join(TABLE_VERSION) - .onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) - .andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) - .whereGe(TABLE_INFO_PRIORITY, 10000) - .andEq(TABLE_VERSION_SCHEDULED, false) - .andEq(TABLE_VERSION_VERSION, version) - .andEq(TABLE_INFO_STATUS, "y") + generateUnScheduledFocusTableSql(SqlBuilder.select(COUNT), version) .build(), Long.class ); } - @Cacheable(value = "table-total", sync = true) + @Cacheable(value = "table-count", sync = true) @Retryable(Throwable.class) - public Long tableTotal() { - return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select("count(distinct concat(src_schema, src_table))").from(TABLE_INFO).whereEq(TABLE_INFO_STATUS, "y").build(), - Long.class - ); - } - - @Cacheable(value = "hudi-total", sync = true) - @Retryable(Throwable.class) - public Long hudiTotal() { - return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select("count(distinct tgt_hdfs_path) as count").from(TABLE_INFO).whereEq(TABLE_INFO_STATUS, "y").build(), - Long.class - ); - } - - @Cacheable(value = "focus-count", sync = true) - @Retryable(Throwable.class) - public Long focusCount() { + public Long tableCount() { return mysqlJdbcTemplate.queryForObject( SqlBuilder.select("count(distinct concat(src_schema, src_table))") .from(TABLE_INFO) - .whereGe(TABLE_INFO_PRIORITY, 10000) - .andEq(TABLE_INFO_STATUS, "y") + .whereEq(TABLE_INFO_STATUS, "y") .build(), Long.class ); } - @Cacheable(value = "normal-count", sync = true) + @Cacheable(value = "table-focus-count", sync = true) @Retryable(Throwable.class) - public Long normalCount() { + public Long tableFocusCount() { return mysqlJdbcTemplate.queryForObject( SqlBuilder.select("count(distinct concat(src_schema, src_table))") .from(TABLE_INFO) - .whereLt(TABLE_INFO_PRIORITY, 10000) - .andEq(TABLE_INFO_STATUS, "y") + .whereEq(TABLE_INFO_STATUS, "y") + .andGe(TABLE_INFO_PRIORITY, 10000) + .build(), + Long.class + ); + } + + @Cacheable(value = "hudi-count", sync = true) + @Retryable(Throwable.class) + public Long hudiCount() { + return mysqlJdbcTemplate.queryForObject( + SqlBuilder.select("count(distinct tgt_hdfs_path) as count") + .from(TABLE_INFO) + .whereEq(TABLE_INFO_STATUS, "y") + .build(), + Long.class + ); + } + + @Cacheable(value = "hudi-focus-count", sync = true) + @Retryable(Throwable.class) + public Long hudiFocusCount() { + return mysqlJdbcTemplate.queryForObject( + SqlBuilder.select("count(distinct tgt_hdfs_path) as count") + .from(TABLE_INFO) + .whereEq(TABLE_INFO_STATUS, "y") + .andGe(TABLE_INFO_PRIORITY, 10000) .build(), Long.class ); diff --git a/service-info-query/src/test/java/SqlBuilderTests.java b/service-info-query/src/test/java/SqlBuilderTests.java index c99b9f0..d28fbe3 100644 --- a/service-info-query/src/test/java/SqlBuilderTests.java +++ b/service-info-query/src/test/java/SqlBuilderTests.java @@ -22,6 +22,7 @@ public class SqlBuilderTests { private static final String TABLE_INFO_FLINK_JOB_ID = column(TABLE_INFO, "flink_job_id"); private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias"); private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority"); + private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status"); private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss"); private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id"); @@ -32,15 +33,10 @@ public class SqlBuilderTests { public static void main(String[] args) { System.out.println(SqlFormatter.format( - SqlBuilder.select(COUNT) + SqlBuilder.select("count(distinct concat(src_schema, src_table))") .from(TABLE_INFO) .whereGe(TABLE_INFO_PRIORITY, 10000) - .andNotIn( - StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), - SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) - .from(TABLE_VERSION) - .whereEq(TABLE_VERSION_VERSION, "20230611") - ) + .andEq(TABLE_INFO_STATUS, "y") .build() )); } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/OverviewController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/OverviewController.java index 7c13b80..195f8a7 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/OverviewController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/OverviewController.java @@ -50,21 +50,21 @@ public class OverviewController extends BaseController { @GetMapping("") public AmisResponse overview() throws ExecutionException, InterruptedException { - CompletableFuture tableTotalFuture = CompletableFuture.supplyAsync(infoService::tableTotal, ExecutorProvider.EXECUTORS); - CompletableFuture hudiTotalFuture = CompletableFuture.supplyAsync(infoService::hudiTotal, ExecutorProvider.EXECUTORS); - CompletableFuture focusCountFuture = CompletableFuture.supplyAsync(infoService::focusCount, ExecutorProvider.EXECUTORS); - CompletableFuture normalCountFuture = CompletableFuture.supplyAsync(infoService::normalCount, ExecutorProvider.EXECUTORS); + CompletableFuture tableCountFuture = CompletableFuture.supplyAsync(infoService::tableCount, ExecutorProvider.EXECUTORS); + CompletableFuture tableFocusCountFuture = CompletableFuture.supplyAsync(infoService::tableFocusCount, ExecutorProvider.EXECUTORS); + CompletableFuture hudiCountFuture = CompletableFuture.supplyAsync(infoService::hudiCount, ExecutorProvider.EXECUTORS); + CompletableFuture hudiFocusCountFuture = CompletableFuture.supplyAsync(infoService::hudiFocusCount, ExecutorProvider.EXECUTORS); CompletableFuture.allOf( - tableTotalFuture, - hudiTotalFuture, - focusCountFuture, - normalCountFuture + tableCountFuture, + tableFocusCountFuture, + hudiCountFuture, + hudiFocusCountFuture ); return responseData() - .withData("table_total", tableTotalFuture.get()) - .withData("hudi_total", hudiTotalFuture.get()) - .withData("focus_count", focusCountFuture.get()) - .withData("normal_count", normalCountFuture.get()); + .withData("table_count", tableCountFuture.get()) + .withData("table_focus_count", tableFocusCountFuture.get()) + .withData("hudi_count", hudiCountFuture.get()) + .withData("hudi_focus_count", hudiFocusCountFuture.get()); } @GetMapping("yarn-job") diff --git a/web/components/overview-tab.js b/web/components/overview-tab.js index 2c1cf9c..ed234af 100644 --- a/web/components/overview-tab.js +++ b/web/components/overview-tab.js @@ -35,6 +35,7 @@ function overviewYarnJob(cluster, search, queue, yarnQueue) { } ] }, + ' ', { type: 'service', className: 'inline', @@ -120,6 +121,7 @@ function overviewTab() { ] }, {type: 'divider'}, + '表数量 (重点表数量, 普通表数量)', { type: 'service', api: '${base}/overview', @@ -128,29 +130,21 @@ function overviewTab() { body: [ { type: 'tpl', - tpl: '逻辑表:${table_total}', + tpl: '逻辑表:${PADSTART(table_count, 4)} (${PADSTART(table_focus_count, 4)}, ${PADSTART(table_count - table_focus_count, 4)})', }, '
', { type: 'tpl', - tpl: '湖底表:${hudi_total}', - }, - '
', - { - type: 'tpl', - tpl: '重点表:${focus_count}', - }, - '
', - { - type: 'tpl', - tpl: '普通表:${normal_count}', + tpl: '湖底表:${PADSTART(hudi_count, 4)} (${PADSTART(hudi_focus_count, 4)}, ${PADSTART(hudi_count - hudi_focus_count, 4)})', }, ] }, {type: 'divider'}, + '集群 (集群总资源使用,队列资源使用)(总压缩任务数,调度中任务数,运行中任务数,失败任务数)', overviewYarnJob('b5-sync', 'Sync', undefined, 'default'), {type: 'divider'}, { + className: 'my-2', type: 'service', api: `\${base}/overview/queue?queue=compaction-queue-pre`, interval: 10000, @@ -158,7 +152,7 @@ function overviewTab() { body: [ { type: 'tpl', - tpl: 'Pre:${size}', + tpl: '预调度队列:${size}', } ] },