From 5b00b5faaf52dcc9ad2a20014795cc868fb17b1b Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 3 Jan 2024 11:14:37 +0800 Subject: [PATCH] =?UTF-8?q?refactor(info-query):=20=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=88=86=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将大的service类拆分到小的类中,方便后期维护和功能调整 --- .../info/controller/CompactionController.java | 53 + .../info/controller/FlinkJobController.java | 39 + .../info/controller/InfoController.java | 179 +-- .../info/controller/SyncStateController.java | 33 + .../info/controller/TableMetaController.java | 91 ++ .../info/controller/VersionController.java | 106 ++ .../service/info/service/BaseService.java | 20 + .../info/service/CompactionService.java | 191 +++ .../service/info/service/FlinkJobService.java | 91 ++ .../service/info/service/InfoService.java | 1142 +---------------- .../info/service/SyncStateService.java | 91 ++ .../info/service/TableMetaService.java | 582 +++++++++ .../service/info/service/VersionService.java | 354 +++++ 13 files changed, 1666 insertions(+), 1306 deletions(-) create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/CompactionController.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/FlinkJobController.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/SyncStateController.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/TableMetaController.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/VersionController.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/BaseService.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/CompactionService.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/FlinkJobService.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/SyncStateService.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/VersionService.java diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/CompactionController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/CompactionController.java new file mode 100644 index 0000000..0c8907e --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/CompactionController.java @@ -0,0 +1,53 @@ +package com.lanyuanxiaoyao.service.info.controller; + +import cn.hutool.core.util.ObjectUtil; +import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; +import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics; +import com.lanyuanxiaoyao.service.info.service.CompactionService; +import java.util.List; +import org.eclipse.collections.api.factory.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Compaction + * + * @author lanyuanxiaoyao + * @date 2024-01-03 + */ +@RestController +@RequestMapping("/info") +public class CompactionController { + private static final Logger logger = LoggerFactory.getLogger(CompactionController.class); + + private final CompactionService compactionService; + + public CompactionController(CompactionService compactionService) { + this.compactionService = compactionService; + } + + @GetMapping("/compaction_metrics") + public PageResponse metrics( + @RequestParam(value = "page", defaultValue = "1") Integer page, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "flink_job_id") Long flinkJobId, + @RequestParam(value = "alias") String alias, + @RequestParam(value = "order", required = false) String order, + @RequestParam(value = "direction", required = false) String direction, + @RequestParam(value = "filter_completes", required = false) List filterCompletes + ) { + return compactionService.findAllCompactionMetrics( + page, + count, + flinkJobId, + alias, + order, + direction, + ObjectUtil.isNull(filterCompletes) ? Lists.immutable.empty() : Lists.immutable.ofAll(filterCompletes) + ); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/FlinkJobController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/FlinkJobController.java new file mode 100644 index 0000000..2903761 --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/FlinkJobController.java @@ -0,0 +1,39 @@ +package com.lanyuanxiaoyao.service.info.controller; + +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.lanyuanxiaoyao.service.info.service.FlinkJobService; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Flink Job + * + * @author lanyuanxiaoyao + * @date 2024-01-03 + */ +@RestController +@RequestMapping("/info") +public class FlinkJobController { + private static final Logger logger = LoggerFactory.getLogger(FlinkJobController.class); + + private final FlinkJobService flinkJobService; + + public FlinkJobController(FlinkJobService flinkJobService) { + this.flinkJobService = flinkJobService; + } + + @GetMapping("/flink_job/list") + public ImmutableList list() { + return flinkJobService.flinkJobs(); + } + + @GetMapping("/flink_job/detail") + public FlinkJob detail(@RequestParam("flink_job_id") Long flinkJobId) { + return flinkJobService.flinkJob(flinkJobId); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java index bf781dd..9c7923f 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java @@ -1,12 +1,10 @@ package com.lanyuanxiaoyao.service.info.controller; -import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; -import com.eshore.odcp.hudi.connector.entity.FlinkJob; -import com.eshore.odcp.hudi.connector.entity.SyncState; -import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; -import com.lanyuanxiaoyao.service.configuration.entity.info.*; +import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; +import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; +import com.lanyuanxiaoyao.service.configuration.entity.info.TableInfoSearchCache; import com.lanyuanxiaoyao.service.info.service.InfoService; import java.util.List; import org.eclipse.collections.api.factory.Lists; @@ -25,7 +23,7 @@ import org.springframework.web.bind.annotation.RestController; * @date 2023-04-24 */ @RestController -@RequestMapping("info") +@RequestMapping("/info") public class InfoController { private static final Logger logger = LoggerFactory.getLogger(InfoController.class); @@ -60,172 +58,11 @@ public class InfoController { ); } - @GetMapping("/version_tables") - public PageResponse versionTables( - @RequestParam(value = "page", defaultValue = "1") Integer page, - @RequestParam(value = "count", defaultValue = "10") Integer count, - @RequestParam(value = "version", required = false) String version, - @RequestParam(value = "flink_job_id", required = false) Long flinkJobId, - @RequestParam(value = "alias", required = false) String alias, - @RequestParam(value = "order", required = false) String order, - @RequestParam(value = "direction", required = false) String direction, - @RequestParam(value = "filter_schedules", required = false) List filterSchedules - ) { - return infoService.findAllVersionTables( - page, - count, - version, - flinkJobId, - alias, - order, - direction, - Lists.immutable.ofAll(filterSchedules) - ); - } - @GetMapping("/job_metas") public ImmutableList jobAndMetas() { return infoService.jobAndMetas(); } - @GetMapping("/flink_job/list") - public ImmutableList flinkJobs() { - return infoService.flinkJobs(); - } - - @GetMapping("/flink_job/detail") - public FlinkJob flinkJob(@RequestParam("flink_job_id") Long flinkJobId) { - return infoService.flinkJob(flinkJobId); - } - - @GetMapping("/table_meta/list") - public ImmutableList tableMetas(@RequestParam(value = "flink_job_id", required = false) Long flinkJobId) { - return ObjectUtil.isNull(flinkJobId) - ? infoService.tableMetas() - : infoService.tableMetas(flinkJobId); - } - - @GetMapping("/table_meta/detail") - public TableMeta tableMeta(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) { - return infoService.tableMeta(flinkJobId, alias); - } - - @GetMapping("/sync_state/detail") - public SyncState syncState(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) { - return infoService.syncState(flinkJobId, alias); - } - - @GetMapping("/non_updated_version_tables") - public ImmutableList nonUpdatedVersionTables() { - return infoService.nonUpdatedVersionTables(); - } - - @GetMapping("/updated_version_tables") - public ImmutableList updatedVersionTables() { - return infoService.updatedVersionTables(); - } - - @GetMapping("/un_receive_version_normal_table") - public ImmutableList unReceiveVersionNormalTable(@RequestParam("version") String version) { - return infoService.unReceiveVersionNormalTable(version); - } - - @GetMapping("/un_receive_version_normal_table_count") - public Long unReceiveVersionNormalTableCount(@RequestParam("version") String version) { - return infoService.unReceiveVersionNormalTableCount(version); - } - - @GetMapping("/un_receive_version_focus_table") - public ImmutableList unReceiveVersionFocusTable(@RequestParam("version") String version) { - return infoService.unReceiveVersionFocusTable(version); - } - - @GetMapping("/un_receive_version_focus_table_count") - public Long unReceiveVersionFocusTableCount(@RequestParam("version") String version) { - return infoService.unReceiveVersionFocusTableCount(version); - } - - @GetMapping("/un_scheduled_normal_table") - public ImmutableList unScheduledNormalTable(@RequestParam("version") String version) { - return infoService.unScheduledNormalTable(version); - } - - @GetMapping("/un_scheduled_normal_table_count") - public Long unScheduledNormalTableCount(@RequestParam("version") String version) { - return infoService.unScheduledNormalTableCount(version); - } - - @GetMapping("/un_scheduled_focus_table") - public ImmutableList unScheduledFocusTable(@RequestParam("version") String version) { - return infoService.unScheduledFocusTable(version); - } - - @GetMapping("/un_scheduled_focus_table_count") - public Long unScheduledFocusTableCount(@RequestParam("version") String version) { - return infoService.unScheduledFocusTableCount(version); - } - - @GetMapping("/table_count") - public Long tableCount() { - return infoService.tableCount(); - } - - @GetMapping("/table_focus_count") - public Long tableFocusCount() { - return infoService.tableFocusCount(); - } - - @GetMapping("/hudi_count") - public Long hudiCount() { - return infoService.hudiCount(); - } - - @GetMapping("/hudi_focus_count") - public Long hudiFocusCount() { - return infoService.hudiFocusCount(); - } - - @GetMapping("/hive_count") - public Long hiveCount() { - return infoService.hiveCount(); - } - - @GetMapping("/hive_focus_count") - public Long hiveFocusCount() { - return infoService.hiveFocusCount(); - } - - @GetMapping("/compaction_metrics") - public PageResponse compactionMetrics( - @RequestParam(value = "page", defaultValue = "1") Integer page, - @RequestParam(value = "count", defaultValue = "10") Integer count, - @RequestParam(value = "flink_job_id") Long flinkJobId, - @RequestParam(value = "alias") String alias, - @RequestParam(value = "order", required = false) String order, - @RequestParam(value = "direction", required = false) String direction, - @RequestParam(value = "filter_completes", required = false) List filterCompletes - ) { - return infoService.findAllCompactionMetrics( - page, - count, - flinkJobId, - alias, - order, - direction, - ObjectUtil.isNull(filterCompletes) ? Lists.immutable.empty() : Lists.immutable.ofAll(filterCompletes) - ); - } - - @GetMapping("/exists_table") - public Boolean existsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) { - return infoService.existsTable(flinkJobId, alias); - } - - @GetMapping("/non_exists_table") - public Boolean nonExistsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) { - return !infoService.existsTable(flinkJobId, alias); - } - @GetMapping("/all_flink_job_id") public ImmutableList allFlinkJobId( @RequestParam(value = "key", required = false) String key, @@ -257,12 +94,4 @@ public class InfoController { .collect(TableInfoSearchCache::getHdfs) .distinct(); } - - @GetMapping("/simple_table_metas") - public ImmutableList simpleTableMetas( - @RequestParam(value = "flink_job_id", required = false) Long flinkJobId, - @RequestParam(value = "alias", required = false) String alias - ) { - return infoService.simpleTableMetas(flinkJobId, alias); - } } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/SyncStateController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/SyncStateController.java new file mode 100644 index 0000000..63c3bd5 --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/SyncStateController.java @@ -0,0 +1,33 @@ +package com.lanyuanxiaoyao.service.info.controller; + +import com.eshore.odcp.hudi.connector.entity.SyncState; +import com.lanyuanxiaoyao.service.info.service.SyncStateService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Sync State + * + * @author lanyaunxiaoyao + * @date 2024-01-03 + */ +@RestController +@RequestMapping("/info") +public class SyncStateController { + private static final Logger logger = LoggerFactory.getLogger(SyncStateController.class); + + private final SyncStateService syncStateService; + + public SyncStateController(SyncStateService syncStateService) { + this.syncStateService = syncStateService; + } + + @GetMapping("/sync_state/detail") + public SyncState syncState(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) { + return syncStateService.syncState(flinkJobId, alias); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/TableMetaController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/TableMetaController.java new file mode 100644 index 0000000..0d62185 --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/TableMetaController.java @@ -0,0 +1,91 @@ +package com.lanyuanxiaoyao.service.info.controller; + +import cn.hutool.core.util.ObjectUtil; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.lanyuanxiaoyao.service.configuration.entity.info.SimpleTableMeta; +import com.lanyuanxiaoyao.service.info.service.TableMetaService; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Table Meta + * + * @author lanyuanxiaoyao + * @date 2024-01-03 + */ +@RestController +@RequestMapping("/info") +public class TableMetaController { + private static final Logger logger = LoggerFactory.getLogger(TableMetaController.class); + + private final TableMetaService tableMetaService; + + public TableMetaController(TableMetaService tableMetaService) { + this.tableMetaService = tableMetaService; + } + + @GetMapping("/table_meta/list") + public ImmutableList list(@RequestParam(value = "flink_job_id", required = false) Long flinkJobId) { + return ObjectUtil.isNull(flinkJobId) + ? tableMetaService.tableMetas() + : tableMetaService.tableMetas(flinkJobId); + } + + @GetMapping("/table_meta/detail") + public TableMeta detail(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) { + return tableMetaService.tableMeta(flinkJobId, alias); + } + + @GetMapping("/simple_table_metas") + public ImmutableList simpleList( + @RequestParam(value = "flink_job_id", required = false) Long flinkJobId, + @RequestParam(value = "alias", required = false) String alias + ) { + return tableMetaService.simpleTableMetas(flinkJobId, alias); + } + + @GetMapping("/exists_table") + public Boolean existsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) { + return tableMetaService.existsTable(flinkJobId, alias); + } + + @GetMapping("/non_exists_table") + public Boolean nonExistsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) { + return !tableMetaService.existsTable(flinkJobId, alias); + } + + @GetMapping("/table_count") + public Long tableCount() { + return tableMetaService.tableCount(); + } + + @GetMapping("/table_focus_count") + public Long tableFocusCount() { + return tableMetaService.tableFocusCount(); + } + + @GetMapping("/hudi_count") + public Long hudiCount() { + return tableMetaService.hudiCount(); + } + + @GetMapping("/hudi_focus_count") + public Long hudiFocusCount() { + return tableMetaService.hudiFocusCount(); + } + + @GetMapping("/hive_count") + public Long hiveCount() { + return tableMetaService.hiveCount(); + } + + @GetMapping("/hive_focus_count") + public Long hiveFocusCount() { + return tableMetaService.hiveFocusCount(); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/VersionController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/VersionController.java new file mode 100644 index 0000000..7242537 --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/VersionController.java @@ -0,0 +1,106 @@ +package com.lanyuanxiaoyao.service.info.controller; + +import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; +import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; +import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; +import com.lanyuanxiaoyao.service.info.service.VersionService; +import java.util.List; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Version + * + * @author lanyuanxiaoyao + * @date 2024-01-03 + */ +@RestController +@RequestMapping("/info") +public class VersionController { + private static final Logger logger = LoggerFactory.getLogger(VersionController.class); + + private final VersionService versionService; + + public VersionController(VersionService versionService) { + this.versionService = versionService; + } + + @GetMapping("/non_updated_version_tables") + public ImmutableList nonUpdatedVersionTables() { + return versionService.nonUpdatedVersionTables(); + } + + @GetMapping("/updated_version_tables") + public ImmutableList updatedVersionTables() { + return versionService.updatedVersionTables(); + } + + @GetMapping("/version_tables") + public PageResponse list( + @RequestParam(value = "page", defaultValue = "1") Integer page, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "version", required = false) String version, + @RequestParam(value = "flink_job_id", required = false) Long flinkJobId, + @RequestParam(value = "alias", required = false) String alias, + @RequestParam(value = "order", required = false) String order, + @RequestParam(value = "direction", required = false) String direction, + @RequestParam(value = "filter_schedules", required = false) List filterSchedules + ) { + return versionService.findAllVersionTables( + page, + count, + version, + flinkJobId, + alias, + order, + direction, + Lists.immutable.ofAll(filterSchedules) + ); + } + + @GetMapping("/un_scheduled_normal_table") + public ImmutableList unScheduledNormalTable(@RequestParam("version") String version) { + return versionService.unScheduledNormalTable(version); + } + + @GetMapping("/un_scheduled_normal_table_count") + public Long unScheduledNormalTableCount(@RequestParam("version") String version) { + return versionService.unScheduledNormalTableCount(version); + } + + @GetMapping("/un_scheduled_focus_table") + public ImmutableList unScheduledFocusTable(@RequestParam("version") String version) { + return versionService.unScheduledFocusTable(version); + } + + @GetMapping("/un_scheduled_focus_table_count") + public Long unScheduledFocusTableCount(@RequestParam("version") String version) { + return versionService.unScheduledFocusTableCount(version); + } + + @GetMapping("/un_receive_version_normal_table") + public ImmutableList unReceiveVersionNormalTable(@RequestParam("version") String version) { + return versionService.unReceiveVersionNormalTable(version); + } + + @GetMapping("/un_receive_version_normal_table_count") + public Long unReceiveVersionNormalTableCount(@RequestParam("version") String version) { + return versionService.unReceiveVersionNormalTableCount(version); + } + + @GetMapping("/un_receive_version_focus_table") + public ImmutableList unReceiveVersionFocusTable(@RequestParam("version") String version) { + return versionService.unReceiveVersionFocusTable(version); + } + + @GetMapping("/un_receive_version_focus_table_count") + public Long unReceiveVersionFocusTableCount(@RequestParam("version") String version) { + return versionService.unReceiveVersionFocusTableCount(version); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/BaseService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/BaseService.java new file mode 100644 index 0000000..cda7b1f --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/BaseService.java @@ -0,0 +1,20 @@ +package com.lanyuanxiaoyao.service.info.service; + +import club.kingon.sql.builder.entry.Alias; +import cn.hutool.core.util.StrUtil; + +/** + * 放公共变量 + * + * @author lanyuanxiaoyao + * @date 2024-01-03 + */ +public class BaseService { + protected static final String COUNT = "count(*)"; + protected static final String STATUS_Y = "y"; + protected static final String STATUS_N = "n"; + + protected static String column(Alias table, String column) { + return StrUtil.format("{}.{}", table.getAlias(), column); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/CompactionService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/CompactionService.java new file mode 100644 index 0000000..9c83c9e --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/CompactionService.java @@ -0,0 +1,191 @@ +package com.lanyuanxiaoyao.service.info.service; + +import club.kingon.sql.builder.SelectSqlBuilder; +import club.kingon.sql.builder.SqlBuilder; +import club.kingon.sql.builder.entry.Alias; +import club.kingon.sql.builder.entry.Column; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.SQLConstants; +import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; +import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics; +import java.sql.Timestamp; +import java.util.List; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionTemplate; + +/** + * 压缩 + * + * @author lanyuanxiaoyao + * @date 2024-01-03 + */ +@CacheConfig(cacheManager = "normal-cache") +@Service +public class CompactionService extends BaseService { + private static final Logger logger = LoggerFactory.getLogger(CompactionService.class); + private final JdbcTemplate mysqlJdbcTemplate; + private final TransactionTemplate mysqlTransactionTemplate; + + public CompactionService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate) { + this.mysqlJdbcTemplate = mysqlJdbcTemplate; + this.mysqlTransactionTemplate = mysqlTransactionTemplate; + } + + private SqlBuilder generateCompactionMetricsCriteria( + SelectSqlBuilder builder, + Integer page, + Integer count, + Long flinkJobId, + String alias, + String order, + String direction, + ImmutableList filterCompletes, + boolean limited + ) { + int limit = Math.max(count, 1); + int offset = limit * Math.max(page - 1, 0); + Alias m1 = Alias.of( + SqlBuilder.selectAll() + .from(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics._alias_) + .whereEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.TYPE_A, "pre") + .andEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.FLINK_JOB_ID_A, flinkJobId) + .andEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.ALIAS_A, alias), + "m1" + ); + Alias m2 = Alias.of( + SqlBuilder.selectAll() + .from(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics._alias_) + .whereEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.TYPE_A, "complete") + .andEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.FLINK_JOB_ID_A, flinkJobId) + .andEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.ALIAS_A, alias), + "m2" + ); + return builder + .from(m1) + .leftJoin(m2) + .onEq(column(m1, "flink_job_id"), Column.as(column(m2, "flink_job_id"))) + .andEq(column(m1, "alias"), Column.as(column(m2, "alias"))) + .andEq(column(m1, "application_id"), Column.as(column(m2, "application_id"))) + .andEq(column(m1, "compaction_plan_instant"), Column.as(column(m2, "compaction_plan_instant"))) + .whereNotNull(filterCompletes.anySatisfy(b -> b), column(m2, "type")) + .orNull(filterCompletes.anySatisfy(b -> !b), column(m2, "type")) + .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), StrUtil.format("m1.{} {}", order, direction)) + .limit(limited, offset, count); + } + + @Cacheable(value = "compaction-metrics", sync = true) + @Retryable(Throwable.class) + public PageResponse findAllCompactionMetrics( + Integer page, + Integer count, + Long flinkJobId, + String alias, + String order, + String direction, + ImmutableList filterCompletes + ) { + return mysqlTransactionTemplate.execute(status -> { + Long total = mysqlJdbcTemplate.queryForObject( + generateCompactionMetricsCriteria( + SqlBuilder.select(COUNT), + page, + count, + flinkJobId, + alias, + order, + direction, + filterCompletes, + false + ).build(), + Long.class + ); + List list = mysqlJdbcTemplate.query( + generateCompactionMetricsCriteria( + SqlBuilder.select( + "m1.flink_job_id", + "m1.alias", + "m1.application_id", + "m1.cluster", + "m1.compaction_plan_instant", + "m2.type is not null as is_complete", + "m1.update_time as started_time", + "m2.update_time as finished_time", + "m1.total_scan_time as pre_total_scan_time", + "m1.total_log_files_compacted as pre_total_log_files_compacted", + "m1.total_log_files_size as pre_total_log_files_size", + "m1.total_records_deleted as pre_total_records_deleted", + "m1.total_records_updated as pre_total_records_updated", + "m1.total_records_compacted as pre_total_records_compacted", + "m2.total_scan_time as complete_total_scan_time", + "m2.total_log_files_compacted as complete_total_log_files_compacted", + "m2.total_log_files_size as complete_total_log_files_size", + "m2.total_records_deleted as complete_total_records_deleted", + "m2.total_records_updated as complete_total_records_updated", + "m2.total_records_compacted as complete_total_records_compacted" + ), + page, + count, + flinkJobId, + alias, + order, + direction, + filterCompletes, + true + ).build(), + (rs, row) -> { + boolean isComplete = rs.getBoolean(6); + if (ObjectUtil.isNull(isComplete)) { + isComplete = false; + } + Timestamp startedTimestamp = rs.getTimestamp(7); + long startedTime = 0; + if (ObjectUtil.isNotNull(startedTimestamp)) { + startedTime = startedTimestamp.getTime(); + } + Timestamp finishedTimestamp = rs.getTimestamp(8); + long finishedTime = 0; + if (ObjectUtil.isNotNull(finishedTimestamp)) { + finishedTime = finishedTimestamp.getTime(); + } + CompactionMetrics.Statistics before = new CompactionMetrics.Statistics( + rs.getLong(9), + rs.getLong(10), + rs.getLong(11), + rs.getLong(12), + rs.getLong(13), + rs.getLong(14) + ); + CompactionMetrics.Statistics after = new CompactionMetrics.Statistics( + rs.getLong(15), + rs.getLong(16), + rs.getLong(17), + rs.getLong(18), + rs.getLong(19), + rs.getLong(20) + ); + return new CompactionMetrics( + rs.getLong(1), + rs.getString(2), + rs.getString(3), + rs.getString(4), + rs.getString(5), + isComplete, + startedTime, + finishedTime, + before, + after + ); + } + ); + return new PageResponse<>(list, total); + }); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/FlinkJobService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/FlinkJobService.java new file mode 100644 index 0000000..7b06773 --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/FlinkJobService.java @@ -0,0 +1,91 @@ +package com.lanyuanxiaoyao.service.info.service; + +import club.kingon.sql.builder.SqlBuilder; +import club.kingon.sql.builder.entry.Column; +import cn.hutool.core.util.ObjectUtil; +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.eshore.odcp.hudi.connector.exception.FlinkJobNotFoundException; +import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionTemplate; + +import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig; +import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppYarnJobConfig; + +/** + * Flink Job + * + * @author lanyuanxiaoyao + * @date 2024-01-03 + */ +@CacheConfig(cacheManager = "normal-cache") +@Service +public class FlinkJobService extends BaseService { + private static final Logger logger = LoggerFactory.getLogger(FlinkJobService.class); + private final JdbcTemplate mysqlJdbcTemplate; + + public FlinkJobService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger) { + this.mysqlJdbcTemplate = mysqlJdbcTemplate; + } + + private ImmutableList flinkJobList(Long flinkJobId) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + SqlBuilder.select( + TbAppFlinkJobConfig.ID_A, + TbAppFlinkJobConfig.NAME_A, + TbAppFlinkJobConfig.RUN_MODE_A, + TbAppYarnJobConfig.JOB_MANAGER_MEMORY_A, + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_A + ) + .from(TbAppFlinkJobConfig._alias_) + .leftJoin(TbAppYarnJobConfig._alias_) + .onEq(TbAppFlinkJobConfig.ONE_IN_ONE_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfig.ID_A)) + .whereEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) + .build(), + (rs, row) -> { + String runModeText = rs.getString(3); + FlinkJob.RunMode mode; + try { + mode = FlinkJob.RunMode.valueOf(runModeText); + } catch (IllegalArgumentException e) { + mode = FlinkJob.RunMode.ALL_IN_ONE; + } + TableMeta.YarnMeta yarnMeta = TableMeta.YarnMeta.builder() + .jobManagerMemory(rs.getInt(4)) + .taskManagerMemory(rs.getInt(5)) + .build(); + return FlinkJob.builder() + .id(rs.getLong(1)) + .name(rs.getString(2)) + .runMode(mode) + .oneInOneSyncYarn(yarnMeta) + .build(); + }) + ); + } + + @Cacheable(value = "flink-jobs", sync = true) + @Retryable(Throwable.class) + public ImmutableList flinkJobs() { + return flinkJobList(null); + } + + @Cacheable(value = "flink-jobs", sync = true, key = "#flinkJobId") + @Retryable(Throwable.class) + public FlinkJob flinkJob(Long flinkJobId) { + return flinkJobList(flinkJobId) + .getFirstOptional() + .orElseThrow(() -> new FlinkJobNotFoundException(flinkJobId)); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index 80395d0..f287eca 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -2,36 +2,17 @@ package com.lanyuanxiaoyao.service.info.service; import club.kingon.sql.builder.SelectSqlBuilder; import club.kingon.sql.builder.SqlBuilder; -import club.kingon.sql.builder.WhereSqlBuilder; -import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Column; -import cn.hutool.core.collection.IterUtil; -import cn.hutool.core.lang.Pair; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; -import com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.DataSource; -import com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.DataSourceTable; -import com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.DataSourceTableField; -import com.eshore.odcp.hudi.connector.entity.FlinkJob; -import com.eshore.odcp.hudi.connector.entity.SyncState; -import com.eshore.odcp.hudi.connector.entity.TableMeta; -import com.eshore.odcp.hudi.connector.exception.ConfigException; -import com.eshore.odcp.hudi.connector.exception.FlinkJobNotFoundException; -import com.eshore.odcp.hudi.connector.exception.SyncStateNotFoundException; -import com.eshore.odcp.hudi.connector.exception.TableMetaNotFoundException; -import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; -import com.lanyuanxiaoyao.service.configuration.entity.info.*; +import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; +import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; +import com.lanyuanxiaoyao.service.configuration.entity.info.TableInfoSearchCache; import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider; -import java.sql.Timestamp; import java.util.List; -import java.util.Locale; -import java.util.function.Function; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; -import org.eclipse.collections.api.map.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.CacheConfig; @@ -47,109 +28,20 @@ import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; * @author lanyuanxiaoyao * @date 2023-04-24 */ -@SuppressWarnings({"OptionalGetWithoutIsPresent", "SqlSourceToSinkFlow"}) @CacheConfig(cacheManager = "normal-cache") @Service -public class InfoService { +public class InfoService extends BaseService { private static final Logger logger = LoggerFactory.getLogger(InfoService.class); - private static final String COUNT = "count(*)"; - private static final String STATUS_Y = "y"; - private static final String STATUS_N = "n"; private final JdbcTemplate mysqlJdbcTemplate; private final TransactionTemplate mysqlTransactionTemplate; - private final SQLLoggerProvider.SQLLogger sqlLogger; + private final FlinkJobService flinkJobService; + private final TableMetaService tableMetaService; - public InfoService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger) { + public InfoService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger, FlinkJobService flinkJobService, TableMetaService tableMetaService) { this.mysqlJdbcTemplate = mysqlJdbcTemplate; this.mysqlTransactionTemplate = mysqlTransactionTemplate; - this.sqlLogger = sqlLogger; - } - - private static String generateVersionTableIdCriteria(Boolean scheduled) { - return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)) - .from( - TbAppCollectTableVersion._alias_, - TbAppFlinkJobConfig._alias_, - TbAppCollectTableInfo._alias_ - ) - .whereEq(TbAppCollectTableVersion.SCHEDULED_A, scheduled) - .andEq(TbAppCollectTableVersion.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) - .andEq(TbAppCollectTableVersion.ALIAS_A, Column.as(TbAppCollectTableInfo.ALIAS_A)) - .andEq(TbAppCollectTableVersion.VERSION_A, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')")) - .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .build(); - } - - private static String column(Alias table, String column) { - return StrUtil.format("{}.{}", table.getAlias(), column); - } - - private static void checkMoreThanOne(String fieldName, Iterable iterable) throws ConfigException { - ConfigException.check(fieldName + " cannot be more than 1", () -> IterUtil.size(iterable) > 1); - } - - private static void checkEmpty(String fieldName, Iterable iterable) throws ConfigException { - ConfigException.check(fieldName + " cannot be empty", () -> IterUtil.isEmpty(iterable)); - } - - private static void checkEmptyOrMoreThanOne(String fieldName, Iterable iterable) throws ConfigException { - checkEmpty(fieldName, iterable); - checkMoreThanOne(fieldName, iterable); - } - - @Cacheable(value = "sync-state", sync = true, key = "#flinkJobId.toString()+#alias") - @Retryable(Throwable.class) - public SyncState syncState(Long flinkJobId, String alias) { - return Lists.immutable.ofAll( - mysqlJdbcTemplate.query( - SqlBuilder.select( - TbAppFlinkJobConfig.ID_A, - TbAppCollectTableInfo.ALIAS_A, - TbAppHudiSyncState.MESSAGE_ID_A, - TbAppHudiSyncState.SOURCE_START_TIME_A, - TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_A, - TbAppHudiSyncState.SOURCE_PUBLISH_TIME_A, - TbAppHudiSyncState.SOURCE_OP_TIME_A, - TbAppHudiSyncState.COMPACTION_START_TIME_A, - TbAppHudiSyncState.COMPACTION_FINISH_TIME_A, - TbAppHudiSyncState.COMPACTION_APPLICATION_ID_A, - TbAppHudiSyncState.COMPACTION_STATUS_A, - TbAppHudiSyncState.COMPACTION_STATUS_TIME_A, - TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_A - ) - .from( - TbAppFlinkJobConfig._alias_, - TbAppCollectTableInfo._alias_, - TbAppHudiSyncState._alias_ - ) - .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) - .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) - .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) - .andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) - .build(), - (rs, row) -> { - Function dateConvertor = timestamp -> timestamp == null ? 0 : timestamp.getTime(); - return SyncState.builder() - .flinkJobId(rs.getLong(1)) - .alias(rs.getString(2)) - .messageId(rs.getString(3)) - .sourceStartTime(dateConvertor.apply(rs.getTimestamp(4))) - .sourceCheckpointTime(dateConvertor.apply(rs.getTimestamp(5))) - .sourcePublishTime(dateConvertor.apply(rs.getTimestamp(6))) - .sourceOperationTime(dateConvertor.apply(rs.getTimestamp(7))) - .compactionStartTime(dateConvertor.apply(rs.getTimestamp(8))) - .compactionFinishTime(dateConvertor.apply(rs.getTimestamp(9))) - .compactionApplicationId(rs.getString(10)) - .compactionStatus(rs.getString(11)) - .compactionStatusTime(dateConvertor.apply(rs.getTimestamp(12))) - .compactionLatestOperationTime(dateConvertor.apply(rs.getTimestamp(13))) - .build(); - } - ) - ).getFirstOptional().orElseThrow(SyncStateNotFoundException::new); + this.flinkJobService = flinkJobService; + this.tableMetaService = tableMetaService; } private SqlBuilder generateJobIdAndAliasCriteria( @@ -272,7 +164,6 @@ public class InfoService { selectedCompactionStatus, true ).build(); - sqlLogger.log(listSQL, "findAllJobIdAndAlias"); List list = mysqlJdbcTemplate.query(listSQL, (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))); return new PageResponse<>(list, total); }); @@ -281,982 +172,8 @@ public class InfoService { @Cacheable(value = "job-metas", sync = true) @Retryable(Throwable.class) public ImmutableList jobAndMetas() { - return flinkJobs() - .collect(job -> new JobAndMetas(job, tableMetaList(job.getId()))); - } - - private ImmutableList flinkJobList(Long flinkJobId) { - return Lists.immutable.ofAll( - mysqlJdbcTemplate.query( - SqlBuilder.select( - TbAppFlinkJobConfig.ID_A, - TbAppFlinkJobConfig.NAME_A, - TbAppFlinkJobConfig.RUN_MODE_A, - TbAppYarnJobConfig.JOB_MANAGER_MEMORY_A, - TbAppYarnJobConfig.TASK_MANAGER_MEMORY_A - ) - .from(TbAppFlinkJobConfig._alias_) - .leftJoin(TbAppYarnJobConfig._alias_) - .onEq(TbAppFlinkJobConfig.ONE_IN_ONE_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfig.ID_A)) - .whereEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) - .build(), - (rs, row) -> { - String runModeText = rs.getString(3); - FlinkJob.RunMode mode; - try { - mode = FlinkJob.RunMode.valueOf(runModeText); - } catch (IllegalArgumentException e) { - mode = FlinkJob.RunMode.ALL_IN_ONE; - } - TableMeta.YarnMeta yarnMeta = TableMeta.YarnMeta.builder() - .jobManagerMemory(rs.getInt(4)) - .taskManagerMemory(rs.getInt(5)) - .build(); - return FlinkJob.builder() - .id(rs.getLong(1)) - .name(rs.getString(2)) - .runMode(mode) - .oneInOneSyncYarn(yarnMeta) - .build(); - }) - ); - } - - @Cacheable(value = "flink-jobs", sync = true) - @Retryable(Throwable.class) - public ImmutableList flinkJobs() { - return flinkJobList(null); - } - - @Cacheable(value = "flink-jobs", sync = true, key = "#flinkJobId") - @Retryable(Throwable.class) - public FlinkJob flinkJob(Long flinkJobId) { - return flinkJobList(flinkJobId) - .getFirstOptional() - .orElseThrow(() -> new FlinkJobNotFoundException(flinkJobId)); - } - - private ImmutableList tableMetaList(Long flinkJobId) { - return tableMetaList(flinkJobId, null); - } - - private ImmutableList tableMetaList(Long flinkJobId, String aliasText) { - return Lists.immutable.ofAll( - mysqlJdbcTemplate.query( - SqlBuilder.select( - DataSource.DS_NAME_A, - DataSource.SCHEMA_NAME_A, - DataSourceTable.TABLE_NAME_A, - DataSourceTable.TABLE_TYPE_A, - DataSourceTableField.FIELD_NAME_A, - DataSourceTableField.FIELD_SEQ_A, - DataSourceTableField.FIELD_TYPE_A, - DataSourceTableField.PRIMARY_KEY_A, - DataSourceTableField.PARTITION_KEY_A, - DataSourceTableField.LENGTH_A, - TbAppCollectTableInfo.TGT_DB_A, - TbAppCollectTableInfo.TGT_TABLE_A, - TbAppCollectTableInfo.TGT_TABLE_TYPE_A, - TbAppCollectTableInfo.TGT_HDFS_PATH_A, - TbAppHudiJobConfig.WRITE_TASKS_A, - TbAppHudiJobConfig.WRITE_OPERATION_A, - TbAppHudiJobConfig.WRITE_TASK_MAX_MEMORY_A, - TbAppHudiJobConfig.WRITE_BATCH_SIZE_A, - TbAppHudiJobConfig.WRITE_RATE_LIMIT_A, - TbAppCollectTableInfo.BUCKET_NUMBER_A, - TbAppHudiJobConfig.COMPACTION_STRATEGY_A, - TbAppHudiJobConfig.COMPACTION_TASKS_A, - TbAppHudiJobConfig.COMPACTION_DELTA_COMMITS_A, - TbAppHudiJobConfig.COMPACTION_DELTA_SECONDS_A, - TbAppHudiJobConfig.COMPACTION_ASYNC_ENABLED_A, - TbAppHudiJobConfig.COMPACTION_MAX_MEMORY_A, - TbAppHudiJobConfig.CONFIGS_A, - TbAppCollectTableInfo.FILTER_FIELD_A, - TbAppCollectTableInfo.FILTER_VALUES_A, - TbAppCollectTableInfo.FILTER_TYPE_A, - TbAppCollectTableInfo.SRC_TOPIC_A, - TbAppCollectTableInfo.SRC_PULSAR_ADDR_A, - Alias.of("tayjc_sync.job_manager_memory", "sync_job_manager_memory"), - Alias.of("tayjc_sync.task_manager_memory", "sync_task_manager_memory"), - Alias.of("tayjc_compaction.job_manager_memory", "compaction_job_manager_memory"), - Alias.of("tayjc_compaction.task_manager_memory", "compaction_task_manger_momory"), - TbAppCollectTableInfo.PARTITION_FIELD_A, - TbAppHudiSyncState.MESSAGE_ID_A, - TbAppGlobalConfig.METRIC_PUBLISH_URL_A, - TbAppGlobalConfig.METRIC_PROMETHEUS_URL_A, - TbAppGlobalConfig.METRIC_API_URL_A, - TbAppGlobalConfig.METRIC_PUBLISH_DELAY_A, - TbAppGlobalConfig.METRIC_PUBLISH_PERIOD_A, - TbAppGlobalConfig.METRIC_PUBLISH_TIMEOUT_A, - TbAppGlobalConfig.METRIC_PUBLISH_BATCH_A, - Alias.of(TbAppFlinkJobConfig.ID_A, "job_id"), - Alias.of(TbAppFlinkJobConfig.NAME_A, "job_name"), - TbAppGlobalConfig.CHECKPOINT_ROOT_PATH_A, - TbAppHudiJobConfig.SOURCE_TASKS_A, - TbAppCollectTableInfo.ALIAS_A, - DataSource.CONNECTION_A, - TbAppCollectTableInfo.PRIORITY_A, - DataSource.DS_TYPE_A, - TbAppHudiJobConfig.KEEP_FILE_VERSION_A, - TbAppHudiJobConfig.KEEP_COMMIT_VERSION_A, - TbAppCollectTableInfo.TAGS_A, - TbAppGlobalConfig.ZK_URL_A, - TbAppCollectTableInfo.VERSION_A, - DataSourceTableField.SCALE_A - ) - .from( - DataSource._alias_, - DataSourceTable._alias_, - DataSourceTableField._alias_, - TbAppFlinkJobConfig._alias_, - TbAppHudiJobConfig._alias_, - Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"), - Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"), - TbAppGlobalConfig._alias_, - TbAppCollectTableInfo._alias_, - TbAppHudiSyncState._alias_ - ) - .whereEq(DataSource.DS_ROLE_A, "src") - .andEq(DataSource.DS_STATE_A, STATUS_Y) - .andEq(DataSource.RECORD_STATE_A, STATUS_Y) - .andEq(DataSourceTable.DS_ID_A, Column.as(DataSource.DS_ID_A)) - .andEq(DataSourceTable.RECORD_STATE_A, STATUS_Y) - .andEq(DataSourceTableField.TABLE_ID_A, Column.as(DataSourceTable.TABLE_ID_A)) - .andEq(DataSourceTableField.RECORD_STATE_A, STATUS_Y) - .andIn(DataSource.DS_TYPE_A, "udal", "telepg") - .andEq(DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A)) - .andEq(DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A)) - .andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A)) - .andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) - .andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A)) - .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id")) - .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id")) - .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A)) - .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) - .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) - .andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText) - .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y) - .andEq("tayjc_sync.status", STATUS_Y) - .andEq("tayjc_compaction.status", STATUS_Y) - .orderBy(DataSourceTableField.FIELD_SEQ_A) - .build(), - (rs, row) -> TableMeta.RowMeta.builder() - .dsName(rs.getString(1)) - .schemaName(rs.getString(2)) - .tableName(rs.getString(3)) - .tableType(rs.getString(4)) - .fieldName(rs.getString(5)) - .fieldSeq(rs.getInt(6)) - .fieldType(rs.getString(7)) - .primaryKey(rs.getString(8)) - .partitionKey(rs.getString(9)) - .length(rs.getLong(10)) - .tgtDb(rs.getString(11)) - .tgtTable(rs.getString(12)) - .tgtTableType(rs.getString(13)) - .tgtHdfsPath(rs.getString(14)) - .writeTasks(rs.getInt(15)) - .writeOperation(rs.getString(16)) - .writeTaskMaxMemory(rs.getInt(17)) - .writeBatchSize(rs.getInt(18)) - .writeRateLimit(rs.getInt(19)) - .bucketIndexNumber(rs.getInt(20)) - .compactionStrategy(rs.getString(21)) - .compactionTasks(rs.getInt(22)) - .compactionDeltaCommits(rs.getInt(23)) - .compactionDeltaSeconds(rs.getInt(24)) - .compactionAsyncEnabled(rs.getString(25)) - .compactionMaxMemory(rs.getInt(26)) - .configs(rs.getString(27)) - .filterField(rs.getString(28)) - .filterValues(rs.getString(29)) - .filterType(rs.getString(30)) - .topic(rs.getString(31)) - .pulsarAddress(rs.getString(32)) - .syncJobManagerMemory(rs.getInt(33)) - .syncTaskManagerMemory(rs.getInt(34)) - .compactionJobManagerMemory(rs.getInt(35)) - .compactionTaskManagerMemory(rs.getInt(36)) - .partitionField(rs.getString(37)) - .messageId(rs.getString(38)) - .metricPublishUrl(rs.getString(39)) - .metricPrometheusUrl(rs.getString(40)) - .metricApiUrl(rs.getString(41)) - .metricPublishDelay(rs.getInt(42)) - .metricPublishPeriod(rs.getInt(43)) - .metricPublishTimeout(rs.getInt(44)) - .metricPublishBatch(rs.getInt(45)) - .jobId(rs.getLong(46)) - .jobName(rs.getString(47)) - .checkpointRootPath(rs.getString(48)) - .sourceTasks(rs.getInt(49)) - .alias(rs.getString(50)) - .connection(rs.getString(51)) - .priority(rs.getInt(52)) - .sourceType(rs.getString(53)) - .keepFileVersion(rs.getInt(54)) - .keepCommitVersion(rs.getInt(55)) - .tags(rs.getString(56)) - .zookeeperUrl(rs.getString(57)) - .version(rs.getInt(58)) - .scala(rs.getInt(59)) - .build() - ) - ) - .asParallel(ExecutorProvider.EXECUTORS, 5) - .groupBy(TableMeta.RowMeta::getAlias) - .multiValuesView() - .collect(aliasRowMetas -> { - try { - ImmutableList rows = aliasRowMetas - .toSortedListBy(TableMeta.RowMeta::getFieldSeq) - .toImmutable(); - - ImmutableList aliasList = rows.collect(TableMeta.RowMeta::getAlias).distinct(); - checkEmptyOrMoreThanOne("alias", aliasList); - String alias = aliasList.get(0); - - ImmutableList sourceTypeList = rows.collect(TableMeta.RowMeta::getSourceType).distinct(); - checkEmptyOrMoreThanOne("source_type", sourceTypeList); - String sourceTypeText = sourceTypeList.get(0).toUpperCase(); - TableMeta.SourceType sourceType; - try { - sourceType = TableMeta.SourceType.valueOf(sourceTypeText); - } catch (IllegalArgumentException e) { - throw new Exception("Cannot parse source type " + sourceTypeText); - } - - ImmutableList dsNames = rows.collect(TableMeta.RowMeta::getDsName).distinct(); - checkEmptyOrMoreThanOne("ds_name", dsNames); - String dataSource = dsNames.get(0); - - ImmutableList schemaNames = rows.collect(TableMeta.RowMeta::getSchemaName).distinct(); - checkEmptyOrMoreThanOne("schema_name", schemaNames); - String schema = schemaNames.get(0); - - ImmutableList tableNames = rows.collect(TableMeta.RowMeta::getTableName).distinct(); - // 每次只能获取 1 张表的元信息 - checkEmptyOrMoreThanOne("table_name", tableNames); - String table = tableNames.get(0); - - ImmutableList tableTypes = rows.collect(TableMeta.RowMeta::getTableType).distinct(); - checkEmptyOrMoreThanOne("table_type", tableTypes); - String type = tableTypes.get(0); - - ImmutableList filterFields = rows.collect(TableMeta.RowMeta::getFilterField).distinct(); - checkEmptyOrMoreThanOne("filter_field", filterFields); - String filterField = filterFields.get(0); - - ImmutableList filterValueList = rows.collect(TableMeta.RowMeta::getFilterValues).distinct(); - checkEmptyOrMoreThanOne("filter_values", filterValueList); - String filterValuesText = filterValueList.get(0); - ImmutableList filterValues = StrUtil.isBlank(filterValuesText) - ? Lists.immutable.empty() - : Lists.immutable.of(filterValuesText.split(",")); - ImmutableList filterTypes = rows.collect(TableMeta.RowMeta::getFilterType).distinct(); - checkEmptyOrMoreThanOne("filter_field", filterFields); - TableMeta.FilterType filterType; - try { - filterType = TableMeta.FilterType.valueOf(filterTypes.get(0)); - } catch (IllegalArgumentException e) { - filterType = TableMeta.FilterType.NONE; - } - - ImmutableList topics = rows.collect(TableMeta.RowMeta::getTopic).distinct(); - checkEmptyOrMoreThanOne("topic", topics); - String topic = topics.get(0); - - ImmutableList pulsarAddresses = rows.collect(TableMeta.RowMeta::getPulsarAddress).distinct(); - checkEmptyOrMoreThanOne("pulsar address", pulsarAddresses); - String pulsarAddress = pulsarAddresses.get(0); - - ImmutableList priorities = rows.collect(TableMeta.RowMeta::getPriority).distinct(); - checkEmptyOrMoreThanOne("priority", priorities); - Integer priority = priorities.get(0); - - ImmutableList tagTexts = rows.collect(TableMeta.RowMeta::getTags).distinct(); - checkEmptyOrMoreThanOne("tags", tagTexts); - String tagText = ObjectUtil.isNull(tagTexts.get(0)) ? "" : tagTexts.get(0); - ImmutableList tags = Lists.immutable.of(tagText.split(",")); - - ImmutableList versions = rows.collect(TableMeta.RowMeta::getVersion).distinct(); - checkEmptyOrMoreThanOne("version", versions); - Integer version = versions.get(0); - - // 获取 Hudi 配置, 因为查出来同一张表的配置都相同, 所以直接取第一条即可 - TableMeta.RowMeta example = rows.get(0); - TableMeta.HudiMeta hudiMeta = TableMeta.HudiMeta.builder() - .targetDataSource(example.getTgtDb()) - .targetTable(example.getTgtTable()) - .targetTableType(example.getTgtTableType()) - .targetHdfsPath(example.getTgtHdfsPath()) - .sourceTasks(example.getSourceTasks()) - .writeTasks(example.getWriteTasks()) - .writeOperation(example.getWriteOperation()) - .writeTaskMaxMemory(example.getWriteTaskMaxMemory()) - .writeBatchSize(example.getWriteBatchSize()) - .writeRateLimit(example.getWriteRateLimit()) - .bucketIndexNumber(example.getBucketIndexNumber()) - .compactionStrategy(example.getCompactionStrategy()) - .compactionTasks(example.getCompactionTasks()) - .compactionDeltaCommits(example.getCompactionDeltaCommits()) - .compactionDeltaSeconds(example.getCompactionDeltaSeconds()) - .compactionAsyncEnabled(example.getCompactionAsyncEnabled()) - .compactionMaxMemory(example.getCompactionMaxMemory()) - .configs(example.getConfigs()) - .keepFileVersion(example.getKeepFileVersion()) - .keepCommitVersion(example.getKeepCommitVersion()) - .build(); - TableMeta.YarnMeta syncYarnMeta = TableMeta.YarnMeta.builder() - .jobManagerMemory(example.getSyncJobManagerMemory()) - .taskManagerMemory(example.getSyncTaskManagerMemory()) - .build(); - TableMeta.YarnMeta compactionYarnMeta = TableMeta.YarnMeta.builder() - .jobManagerMemory(example.getCompactionJobManagerMemory()) - .taskManagerMemory(example.getCompactionTaskManagerMemory()) - .build(); - TableMeta.ConfigMeta configMeta = TableMeta.ConfigMeta.builder() - .messageId(example.getMessageId()) - .metricPublishUrl(example.getMetricPublishUrl()) - .metricPrometheusUrl(example.getMetricPrometheusUrl()) - .metricApiUrl(example.getMetricApiUrl()) - .metricPublishDelay(example.getMetricPublishDelay()) - .metricPublishPeriod(example.getMetricPublishPeriod()) - .metricPublishTimeout(example.getMetricPublishTimeout()) - .metricPublishBatch(example.getMetricPublishBatch()) - .checkpointRootPath(example.getCheckpointRootPath()) - .zookeeperUrl(example.getZookeeperUrl()) - .build(); - TableMeta.JobMeta jobMeta = TableMeta.JobMeta.builder() - .id(example.getJobId()) - .name(example.getJobName()) - .build(); - - TableMeta.ConnectionMeta connectionMeta = null; - String connectionText = example.getConnection(); - if (StrUtil.isNotBlank(connectionText)) { - JSONObject connectionObj = JSONUtil.parseObj(connectionText); - connectionMeta = TableMeta.ConnectionMeta.builder() - .url(connectionObj.getStr("jdbc_url")) - .user(connectionObj.getStr("jdbc_user")) - .password(connectionObj.getStr("jdbc_password")) - .driver(connectionObj.getStr("jdbc_driver")) - .build(); - } - - ImmutableList partitionFields = rows.collect(TableMeta.RowMeta::getPartitionField).distinct(); - checkEmptyOrMoreThanOne("partition_field", filterFields); - String partitionField = partitionFields.get(0); - - List primaryKeys = Lists.mutable.empty(), - partitionKeys = Lists.mutable.empty(), - fieldMetaList = Lists.mutable.empty(); - for (TableMeta.RowMeta rowMeta : rows) { - boolean isPrimaryKey = StrUtil.equals(STATUS_Y, rowMeta.getPrimaryKey()); - boolean isPartitionKey = StrUtil.equals(STATUS_Y, rowMeta.getPartitionKey()); - TableMeta.FieldMeta fieldMeta = TableMeta.FieldMeta.builder() - .name(rowMeta.getFieldName().toUpperCase(Locale.ROOT)) - .sequence(rowMeta.getFieldSeq()) - .type(rowMeta.getFieldType()) - .isPrimaryKey(isPrimaryKey) - .partitionKey(isPartitionKey) - .length(rowMeta.getLength()) - .scala(rowMeta.getScala()) - .build(); - if (isPrimaryKey) { - primaryKeys.add(fieldMeta); - } - if (isPartitionKey) { - partitionKeys.add(fieldMeta); - } - fieldMetaList.add(fieldMeta); - } - return TableMeta.builder() - .alias(alias) - .source(dataSource) - .schema(schema) - .table(table) - .type(type) - .primaryKeys(primaryKeys) - .partitionKeys(partitionKeys) - .hudi(hudiMeta) - .fields(fieldMetaList) - .filterField(filterField) - .filterValues(filterValues.toList()) - .filterType(filterType) - .topic(topic) - .pulsarAddress(pulsarAddress) - .syncYarn(syncYarnMeta) - .compactionYarn(compactionYarnMeta) - .partitionField(partitionField) - .config(configMeta) - .job(jobMeta) - .connection(connectionMeta) - .priority(priority) - .sourceType(sourceType) - .tags(tags.toList()) - .version(version) - .build(); - } catch (Throwable throwable) { - throw new RuntimeException(throwable); - } - }) - .toList() - .toImmutable(); - } - - @Cacheable(value = "table-metas", sync = true) - @Retryable(Throwable.class) - public ImmutableList tableMetas() { - return flinkJobs().flatCollect(job -> tableMetaList(job.getId())); - } - - @Cacheable(value = "table-metas", sync = true, key = "#flinkJobId") - @Retryable(Throwable.class) - public ImmutableList tableMetas(Long flinkJobId) { - return tableMetaList(flinkJobId); - } - - @Cacheable(value = "table-metas", sync = true, key = "#flinkJobId.toString()+#alias") - @Retryable(Throwable.class) - public TableMeta tableMeta(Long flinkJobId, String alias) { - return tableMetaList(flinkJobId, alias) - .getFirstOptional() - .orElseThrow(TableMetaNotFoundException::new); - } - - @Cacheable("un-updated-version-table") - @Retryable(Throwable.class) - public ImmutableList nonUpdatedVersionTables() { - return mysqlTransactionTemplate.execute(status -> { - String listSQL = generateVersionTableIdCriteria(false); - sqlLogger.log(listSQL, "nonUpdatedVersionTables"); - List ids = mysqlJdbcTemplate.queryForList(listSQL, String.class); - return Lists.immutable.ofAll(ids); - }); - } - - @Cacheable("updated-version-table") - @Retryable(Throwable.class) - public ImmutableList updatedVersionTables() { - return mysqlTransactionTemplate.execute(status -> { - String listSQL = generateVersionTableIdCriteria(true); - sqlLogger.log(listSQL, "updatedVersionTables"); - List ids = mysqlJdbcTemplate.queryForList(listSQL, String.class); - return Lists.immutable.ofAll(ids); - }); - } - - private SqlBuilder generateVersionTableCriteria( - SelectSqlBuilder builder, - Integer page, - Integer count, - String version, - Long flinkJobId, - String alias, - String order, - String direction, - ImmutableList filterSchedules, - boolean limited, - boolean groupBy - ) { - int limit = Math.max(count, 1); - int offset = limit * Math.max(page - 1, 0); - WhereSqlBuilder root = builder - .from( - TbAppFlinkJobConfig._alias_, - TbAppCollectTableInfo._alias_, - TbAppCollectTableVersion._alias_, - TbAppHudiSyncState._alias_ - ) - .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.ALIAS_A)) - .andEq(TbAppFlinkJobConfig.STATUS_A, "y") - .andEq(TbAppCollectTableInfo.STATUS_A, "y") - .andEq(TbAppCollectTableVersion.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) - .andEq(TbAppCollectTableVersion.ALIAS_A, Column.as(TbAppCollectTableInfo.ALIAS_A)) - .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("CONCAT({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) - .andLike(ObjectUtil.isNotNull(flinkJobId), TbAppCollectTableInfo.FLINK_JOB_ID_A, flinkJobId) - .andLike(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) - .andEq(StrUtil.isNotBlank(version), TbAppCollectTableVersion.VERSION_A, version) - .andIn(ObjectUtil.isNotEmpty(filterSchedules), TbAppCollectTableVersion.SCHEDULED_A, filterSchedules); - if (groupBy) { - return root.groupBy(TbAppCollectTableVersion.SCHEDULED_A); - } else { - return root - .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) - .limit(limited, offset, count); - } - } - - @Cacheable(value = "version-tables", sync = true) - @Retryable(Throwable.class) - public PageResponse findAllVersionTables( - Integer page, - Integer count, - String version, - Long flinkJobId, - String alias, - String order, - String direction, - ImmutableList filterSchedules - ) { - return mysqlTransactionTemplate.execute(status -> { - Long total = mysqlJdbcTemplate.queryForObject( - generateVersionTableCriteria( - SqlBuilder.select(COUNT), - page, - count, - version, - flinkJobId, - alias, - order, - direction, - filterSchedules, - false, - false - ).build(), - Long.class - ); - List> groupMap = mysqlJdbcTemplate.query( - generateVersionTableCriteria( - SqlBuilder.select(TbAppCollectTableVersion.SCHEDULED_A, COUNT), - page, - count, - version, - flinkJobId, - alias, - order, - direction, - filterSchedules, - false, - true - ).build(), - (rs, row) -> new Pair<>(rs.getBoolean(1), rs.getInt(2)) - ); - ImmutableMap scheduleCount = Lists.immutable.ofAll(groupMap) - .groupBy(Pair::getKey) - .toMap() - .collectValues((key, list) -> list.getOnly().getValue()) - .toImmutable(); - String listSQL = generateVersionTableCriteria( - SqlBuilder.select( - TbAppCollectTableInfo.FLINK_JOB_ID_A, - TbAppCollectTableInfo.ALIAS_A, - TbAppCollectTableVersion.VERSION_A, - TbAppCollectTableVersion.SCHEDULED_A - ), - page, - count, - version, - flinkJobId, - alias, - order, - direction, - filterSchedules, - true, - false - ).build(); - sqlLogger.log(listSQL, "findAllVersionTables"); - List list = mysqlJdbcTemplate.query( - listSQL, - (rs, row) -> new VersionUpdated( - rs.getLong(1), - rs.getString(2), - rs.getString(3), - rs.getBoolean(4) - ) - ); - return new PageResponse<>(list, total) - .withMetadata("scheduled", scheduleCount.getOrDefault(true, 0)) - .withMetadata("unScheduled", scheduleCount.getOrDefault(false, 0)); - }); - } - - private SqlBuilder generateUnReceiveVersionNormalTableSql(SelectSqlBuilder builder, String version) { - return builder - .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) - .whereLt(TbAppCollectTableInfo.PRIORITY_A, 10000) - .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) - .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andNotIn( - StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A), - SqlBuilder.select(StrUtil.format("concat({}, {})", TbAppCollectTableVersion.FLINK_JOB_ID_A, TbAppCollectTableVersion.ALIAS_A)) - .from(TbAppCollectTableVersion._alias_) - .whereEq(TbAppCollectTableVersion.VERSION_A, 1) - ); - } - - @Cacheable(value = "un-receive-version-normal-table", sync = true) - @Retryable(Throwable.class) - public ImmutableList unReceiveVersionNormalTable(String version) { - return Lists.immutable.ofAll( - mysqlJdbcTemplate.query( - generateUnReceiveVersionNormalTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) - .build(), - (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) - ) - ); - } - - @Cacheable(value = "un-receive-version-normal-table-count", sync = true) - @Retryable(Throwable.class) - public Long unReceiveVersionNormalTableCount(String version) { - return mysqlJdbcTemplate.queryForObject( - generateUnReceiveVersionNormalTableSql(SqlBuilder.select(COUNT), version) - .build(), - Long.class - ); - } - - private SqlBuilder generateUnReceiveVersionFocusTable(SelectSqlBuilder builder, String version) { - return builder - .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) - .whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000) - .andEq(TbAppCollectTableInfo.STATUS_A, "y") - .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) - .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andNotIn( - StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A), - SqlBuilder.select(StrUtil.format("concat({}, {})", TbAppCollectTableVersion.FLINK_JOB_ID_A, TbAppCollectTableVersion.ALIAS_A)) - .from(TbAppCollectTableVersion._alias_) - .whereEq(TbAppCollectTableVersion.VERSION_A, version) - ); - } - - @Cacheable(value = "un-receive-version-focus-table", sync = true) - @Retryable(Throwable.class) - public ImmutableList unReceiveVersionFocusTable(String version) { - return Lists.immutable.ofAll( - mysqlJdbcTemplate.query( - generateUnReceiveVersionFocusTable(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) - .build(), - (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) - ) - ); - } - - @Cacheable(value = "un-receive-version-focus-table-count", sync = true) - @Retryable(Throwable.class) - public Long unReceiveVersionFocusTableCount(String version) { - return mysqlJdbcTemplate.queryForObject( - generateUnReceiveVersionFocusTable(SqlBuilder.select(COUNT), version) - .build(), - Long.class - ); - } - - private SqlBuilder generateUnScheduledNormalTableSql(SelectSqlBuilder builder, String version) { - return builder - .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_, TbAppCollectTableVersion._alias_) - .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) - .andEq(TbAppFlinkJobConfig.STATUS_A, "y") - .andEq(TbAppCollectTableInfo.STATUS_A, "y") - .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) - .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) - .andEq(TbAppCollectTableInfo.PRIORITY_A, 10000) - .andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false) - .andEq(TbAppCollectTableVersion.VERSION_A, "2018"); - } - - @Cacheable(value = "un-scheduled-normal-table", sync = true) - @Retryable(Throwable.class) - public ImmutableList unScheduledNormalTable(String version) { - return Lists.immutable.ofAll( - mysqlJdbcTemplate.query( - generateUnScheduledNormalTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) - .build(), - (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) - ) - ); - } - - @Cacheable(value = "un_scheduled_normal_table_count", sync = true) - @Retryable(Throwable.class) - public Long unScheduledNormalTableCount(String version) { - return mysqlJdbcTemplate.queryForObject( - generateUnScheduledNormalTableSql(SqlBuilder.select(COUNT), version) - .build(), - Long.class - ); - } - - private SqlBuilder generateUnScheduledFocusTableSql(SelectSqlBuilder builder, String version) { - return builder - .from(TbAppCollectTableInfo._alias_) - .join(TbAppCollectTableVersion._alias_) - .onEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) - .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) - .whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000) - .andEq(TbAppCollectTableVersion.SCHEDULED_A, false) - .andEq(TbAppCollectTableVersion.VERSION_A, version) - .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y); - } - - @Cacheable(value = "un-scheduled-focus-table", sync = true) - @Retryable(Throwable.class) - public ImmutableList unScheduledFocusTable(String version) { - return Lists.immutable.ofAll( - mysqlJdbcTemplate.query( - generateUnScheduledFocusTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) - .build(), - (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) - ) - ); - } - - @Cacheable(value = "un_scheduled_focus_table_count", sync = true) - @Retryable(Throwable.class) - public Long unScheduledFocusTableCount(String version) { - return mysqlJdbcTemplate.queryForObject( - generateUnScheduledFocusTableSql(SqlBuilder.select(COUNT), version) - .build(), - Long.class - ); - } - - @Cacheable(value = "table-count", sync = true) - @Retryable(Throwable.class) - public Long tableCount() { - return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .build(), - Long.class - ); - } - - @Cacheable(value = "table-focus-count", sync = true) - @Retryable(Throwable.class) - public Long tableFocusCount() { - return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) - .build(), - Long.class - ); - } - - @Cacheable(value = "hudi-count", sync = true) - @Retryable(Throwable.class) - public Long hudiCount() { - return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .build(), - Long.class - ); - } - - @Cacheable(value = "hudi-focus-count", sync = true) - @Retryable(Throwable.class) - public Long hudiFocusCount() { - return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) - .build(), - Long.class - ); - } - - @Cacheable(value = "hive-count", sync = true) - @Retryable(Throwable.class) - public Long hiveCount() { - return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .build(), - Long.class - ); - } - - @Cacheable(value = "hive-focus-count", sync = true) - @Retryable(Throwable.class) - public Long hiveFocusCount() { - return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) - .build(), - Long.class - ); - } - - private SqlBuilder generateCompactionMetricsCriteria( - SelectSqlBuilder builder, - Integer page, - Integer count, - Long flinkJobId, - String alias, - String order, - String direction, - ImmutableList filterCompletes, - boolean limited - ) { - int limit = Math.max(count, 1); - int offset = limit * Math.max(page - 1, 0); - Alias m1 = Alias.of( - SqlBuilder.selectAll() - .from(TbAppHudiCompactionMetrics._alias_) - .whereEq(TbAppHudiCompactionMetrics.TYPE_A, "pre") - .andEq(TbAppHudiCompactionMetrics.FLINK_JOB_ID_A, flinkJobId) - .andEq(TbAppHudiCompactionMetrics.ALIAS_A, alias), - "m1" - ); - Alias m2 = Alias.of( - SqlBuilder.selectAll() - .from(TbAppHudiCompactionMetrics._alias_) - .whereEq(TbAppHudiCompactionMetrics.TYPE_A, "complete") - .andEq(TbAppHudiCompactionMetrics.FLINK_JOB_ID_A, flinkJobId) - .andEq(TbAppHudiCompactionMetrics.ALIAS_A, alias), - "m2" - ); - return builder - .from(m1) - .leftJoin(m2) - .onEq(column(m1, "flink_job_id"), Column.as(column(m2, "flink_job_id"))) - .andEq(column(m1, "alias"), Column.as(column(m2, "alias"))) - .andEq(column(m1, "application_id"), Column.as(column(m2, "application_id"))) - .andEq(column(m1, "compaction_plan_instant"), Column.as(column(m2, "compaction_plan_instant"))) - .whereNotNull(filterCompletes.anySatisfy(b -> b), column(m2, "type")) - .orNull(filterCompletes.anySatisfy(b -> !b), column(m2, "type")) - .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), StrUtil.format("m1.{} {}", order, direction)) - .limit(limited, offset, count); - } - - @Cacheable(value = "compaction-metrics", sync = true) - @Retryable(Throwable.class) - public PageResponse findAllCompactionMetrics( - Integer page, - Integer count, - Long flinkJobId, - String alias, - String order, - String direction, - ImmutableList filterCompletes - ) { - return mysqlTransactionTemplate.execute(status -> { - Long total = mysqlJdbcTemplate.queryForObject( - generateCompactionMetricsCriteria( - SqlBuilder.select(COUNT), - page, - count, - flinkJobId, - alias, - order, - direction, - filterCompletes, - false - ).build(), - Long.class - ); - List list = mysqlJdbcTemplate.query( - generateCompactionMetricsCriteria( - SqlBuilder.select( - "m1.flink_job_id", - "m1.alias", - "m1.application_id", - "m1.cluster", - "m1.compaction_plan_instant", - "m2.type is not null as is_complete", - "m1.update_time as started_time", - "m2.update_time as finished_time", - "m1.total_scan_time as pre_total_scan_time", - "m1.total_log_files_compacted as pre_total_log_files_compacted", - "m1.total_log_files_size as pre_total_log_files_size", - "m1.total_records_deleted as pre_total_records_deleted", - "m1.total_records_updated as pre_total_records_updated", - "m1.total_records_compacted as pre_total_records_compacted", - "m2.total_scan_time as complete_total_scan_time", - "m2.total_log_files_compacted as complete_total_log_files_compacted", - "m2.total_log_files_size as complete_total_log_files_size", - "m2.total_records_deleted as complete_total_records_deleted", - "m2.total_records_updated as complete_total_records_updated", - "m2.total_records_compacted as complete_total_records_compacted" - ), - page, - count, - flinkJobId, - alias, - order, - direction, - filterCompletes, - true - ).build(), - (rs, row) -> { - boolean isComplete = rs.getBoolean(6); - if (ObjectUtil.isNull(isComplete)) { - isComplete = false; - } - Timestamp startedTimestamp = rs.getTimestamp(7); - long startedTime = 0; - if (ObjectUtil.isNotNull(startedTimestamp)) { - startedTime = startedTimestamp.getTime(); - } - Timestamp finishedTimestamp = rs.getTimestamp(8); - long finishedTime = 0; - if (ObjectUtil.isNotNull(finishedTimestamp)) { - finishedTime = finishedTimestamp.getTime(); - } - CompactionMetrics.Statistics before = new CompactionMetrics.Statistics( - rs.getLong(9), - rs.getLong(10), - rs.getLong(11), - rs.getLong(12), - rs.getLong(13), - rs.getLong(14) - ); - CompactionMetrics.Statistics after = new CompactionMetrics.Statistics( - rs.getLong(15), - rs.getLong(16), - rs.getLong(17), - rs.getLong(18), - rs.getLong(19), - rs.getLong(20) - ); - return new CompactionMetrics( - rs.getLong(1), - rs.getString(2), - rs.getString(3), - rs.getString(4), - rs.getString(5), - isComplete, - startedTime, - finishedTime, - before, - after - ); - } - ); - return new PageResponse<>(list, total); - }); - } - - @Cacheable(value = "exists-table", sync = true) - @Retryable(Throwable.class) - public Boolean existsTable(Long flinkJobId, String alias) { - return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select("count(*) > 0") - .from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_) - .whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) - .andEq(TbAppFlinkJobConfig.ID_A, flinkJobId) - .andEq(TbAppCollectTableInfo.ALIAS_A, alias) - .andEq(TbAppFlinkJobConfig.STATUS_A, "y") - .andEq(TbAppCollectTableInfo.STATUS_A, "y") - .build(), - Boolean.class - ); + return flinkJobService.flinkJobs() + .collect(job -> new JobAndMetas(job, tableMetaService.tableMetaList(job.getId()))); } @Cacheable(value = "all-table-info-search-cache", sync = true, cacheManager = "long-cache") @@ -1272,41 +189,4 @@ public class InfoService { (rs, row) -> new TableInfoSearchCache(rs.getLong(1), rs.getString(2), rs.getString(3)) )); } - - @Cacheable(value = "simple-table-metas", sync = true, cacheManager = "long-cache") - @Retryable(Throwable.class) - public ImmutableList simpleTableMetas(Long flinkJobId, String alias) { - return Lists.immutable.ofAll( - mysqlJdbcTemplate.query( - SqlBuilder - .select( - TbAppFlinkJobConfig.ID_A, - TbAppFlinkJobConfig.NAME_A, - TbAppCollectTableInfo.ALIAS_A, - TbAppCollectTableInfo.SRC_SCHEMA_A, - TbAppCollectTableInfo.SRC_TABLE_A, - TbAppCollectTableInfo.TGT_DB_A, - TbAppCollectTableInfo.TGT_TABLE_A, - TbAppCollectTableInfo.TGT_HDFS_PATH_A - ) - .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) - .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) - .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) - .andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) - .build(), - (rs, row) -> new SimpleTableMeta( - rs.getLong(1), - rs.getString(2), - rs.getString(3), - rs.getString(4), - rs.getString(5), - rs.getString(6), - rs.getString(7), - rs.getString(8) - ) - ) - ); - } } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/SyncStateService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/SyncStateService.java new file mode 100644 index 0000000..9011260 --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/SyncStateService.java @@ -0,0 +1,91 @@ +package com.lanyuanxiaoyao.service.info.service; + +import club.kingon.sql.builder.SqlBuilder; +import club.kingon.sql.builder.entry.Column; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.entity.SyncState; +import com.eshore.odcp.hudi.connector.exception.SyncStateNotFoundException; +import java.sql.Timestamp; +import java.util.function.Function; +import org.eclipse.collections.api.factory.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; + +import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; + +/** + * Sync State + * + * @author lanyuanxiaoyao + * @date 2024-01-03 + */ +@CacheConfig(cacheManager = "normal-cache") +@Service +public class SyncStateService extends BaseService { + private static final Logger logger = LoggerFactory.getLogger(SyncStateService.class); + private final JdbcTemplate mysqlJdbcTemplate; + + public SyncStateService(JdbcTemplate mysqlJdbcTemplate) { + this.mysqlJdbcTemplate = mysqlJdbcTemplate; + } + + @Cacheable(value = "sync-state", sync = true, key = "#flinkJobId.toString()+#alias") + @Retryable(Throwable.class) + public SyncState syncState(Long flinkJobId, String alias) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + SqlBuilder.select( + TbAppFlinkJobConfig.ID_A, + TbAppCollectTableInfo.ALIAS_A, + TbAppHudiSyncState.MESSAGE_ID_A, + TbAppHudiSyncState.SOURCE_START_TIME_A, + TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_A, + TbAppHudiSyncState.SOURCE_PUBLISH_TIME_A, + TbAppHudiSyncState.SOURCE_OP_TIME_A, + TbAppHudiSyncState.COMPACTION_START_TIME_A, + TbAppHudiSyncState.COMPACTION_FINISH_TIME_A, + TbAppHudiSyncState.COMPACTION_APPLICATION_ID_A, + TbAppHudiSyncState.COMPACTION_STATUS_A, + TbAppHudiSyncState.COMPACTION_STATUS_TIME_A, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_A + ) + .from( + TbAppFlinkJobConfig._alias_, + TbAppCollectTableInfo._alias_, + TbAppHudiSyncState._alias_ + ) + .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) + .andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) + .build(), + (rs, row) -> { + Function dateConvertor = timestamp -> timestamp == null ? 0 : timestamp.getTime(); + return SyncState.builder() + .flinkJobId(rs.getLong(1)) + .alias(rs.getString(2)) + .messageId(rs.getString(3)) + .sourceStartTime(dateConvertor.apply(rs.getTimestamp(4))) + .sourceCheckpointTime(dateConvertor.apply(rs.getTimestamp(5))) + .sourcePublishTime(dateConvertor.apply(rs.getTimestamp(6))) + .sourceOperationTime(dateConvertor.apply(rs.getTimestamp(7))) + .compactionStartTime(dateConvertor.apply(rs.getTimestamp(8))) + .compactionFinishTime(dateConvertor.apply(rs.getTimestamp(9))) + .compactionApplicationId(rs.getString(10)) + .compactionStatus(rs.getString(11)) + .compactionStatusTime(dateConvertor.apply(rs.getTimestamp(12))) + .compactionLatestOperationTime(dateConvertor.apply(rs.getTimestamp(13))) + .build(); + } + ) + ).getFirstOptional().orElseThrow(SyncStateNotFoundException::new); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java new file mode 100644 index 0000000..011209f --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java @@ -0,0 +1,582 @@ +package com.lanyuanxiaoyao.service.info.service; + +import club.kingon.sql.builder.SqlBuilder; +import club.kingon.sql.builder.entry.Alias; +import club.kingon.sql.builder.entry.Column; +import cn.hutool.core.collection.IterUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.eshore.odcp.hudi.connector.exception.ConfigException; +import com.eshore.odcp.hudi.connector.exception.TableMetaNotFoundException; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import com.lanyuanxiaoyao.service.configuration.entity.info.SimpleTableMeta; +import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider; +import java.util.List; +import java.util.Locale; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionTemplate; + +import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; +import static com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.*; + +/** + * Table Meta + * + * @author lanyuanxiaoyao + * @date 2024-01-03 + */ +@CacheConfig(cacheManager = "normal-cache") +@Service +public class TableMetaService extends BaseService { + private static final Logger logger = LoggerFactory.getLogger(TableMetaService.class); + private final JdbcTemplate mysqlJdbcTemplate; + private final FlinkJobService flinkJobService; + + public TableMetaService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger, FlinkJobService flinkJobService) { + this.mysqlJdbcTemplate = mysqlJdbcTemplate; + this.flinkJobService = flinkJobService; + } + + private static void checkMoreThanOne(String fieldName, Iterable iterable) throws ConfigException { + ConfigException.check(fieldName + " cannot be more than 1", () -> IterUtil.size(iterable) > 1); + } + + private static void checkEmpty(String fieldName, Iterable iterable) throws ConfigException { + ConfigException.check(fieldName + " cannot be empty", () -> IterUtil.isEmpty(iterable)); + } + + private static void checkEmptyOrMoreThanOne(String fieldName, Iterable iterable) throws ConfigException { + checkEmpty(fieldName, iterable); + checkMoreThanOne(fieldName, iterable); + } + + public ImmutableList tableMetaList(Long flinkJobId) { + return tableMetaList(flinkJobId, null); + } + + private ImmutableList tableMetaList(Long flinkJobId, String aliasText) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + SqlBuilder.select( + DataSource.DS_NAME_A, + DataSource.SCHEMA_NAME_A, + DataSourceTable.TABLE_NAME_A, + DataSourceTable.TABLE_TYPE_A, + DataSourceTableField.FIELD_NAME_A, + DataSourceTableField.FIELD_SEQ_A, + DataSourceTableField.FIELD_TYPE_A, + DataSourceTableField.PRIMARY_KEY_A, + DataSourceTableField.PARTITION_KEY_A, + DataSourceTableField.LENGTH_A, + TbAppCollectTableInfo.TGT_DB_A, + TbAppCollectTableInfo.TGT_TABLE_A, + TbAppCollectTableInfo.TGT_TABLE_TYPE_A, + TbAppCollectTableInfo.TGT_HDFS_PATH_A, + TbAppHudiJobConfig.WRITE_TASKS_A, + TbAppHudiJobConfig.WRITE_OPERATION_A, + TbAppHudiJobConfig.WRITE_TASK_MAX_MEMORY_A, + TbAppHudiJobConfig.WRITE_BATCH_SIZE_A, + TbAppHudiJobConfig.WRITE_RATE_LIMIT_A, + TbAppCollectTableInfo.BUCKET_NUMBER_A, + TbAppHudiJobConfig.COMPACTION_STRATEGY_A, + TbAppHudiJobConfig.COMPACTION_TASKS_A, + TbAppHudiJobConfig.COMPACTION_DELTA_COMMITS_A, + TbAppHudiJobConfig.COMPACTION_DELTA_SECONDS_A, + TbAppHudiJobConfig.COMPACTION_ASYNC_ENABLED_A, + TbAppHudiJobConfig.COMPACTION_MAX_MEMORY_A, + TbAppHudiJobConfig.CONFIGS_A, + TbAppCollectTableInfo.FILTER_FIELD_A, + TbAppCollectTableInfo.FILTER_VALUES_A, + TbAppCollectTableInfo.FILTER_TYPE_A, + TbAppCollectTableInfo.SRC_TOPIC_A, + TbAppCollectTableInfo.SRC_PULSAR_ADDR_A, + Alias.of("tayjc_sync.job_manager_memory", "sync_job_manager_memory"), + Alias.of("tayjc_sync.task_manager_memory", "sync_task_manager_memory"), + Alias.of("tayjc_compaction.job_manager_memory", "compaction_job_manager_memory"), + Alias.of("tayjc_compaction.task_manager_memory", "compaction_task_manger_momory"), + TbAppCollectTableInfo.PARTITION_FIELD_A, + TbAppHudiSyncState.MESSAGE_ID_A, + TbAppGlobalConfig.METRIC_PUBLISH_URL_A, + TbAppGlobalConfig.METRIC_PROMETHEUS_URL_A, + TbAppGlobalConfig.METRIC_API_URL_A, + TbAppGlobalConfig.METRIC_PUBLISH_DELAY_A, + TbAppGlobalConfig.METRIC_PUBLISH_PERIOD_A, + TbAppGlobalConfig.METRIC_PUBLISH_TIMEOUT_A, + TbAppGlobalConfig.METRIC_PUBLISH_BATCH_A, + Alias.of(TbAppFlinkJobConfig.ID_A, "job_id"), + Alias.of(TbAppFlinkJobConfig.NAME_A, "job_name"), + TbAppGlobalConfig.CHECKPOINT_ROOT_PATH_A, + TbAppHudiJobConfig.SOURCE_TASKS_A, + TbAppCollectTableInfo.ALIAS_A, + DataSource.CONNECTION_A, + TbAppCollectTableInfo.PRIORITY_A, + DataSource.DS_TYPE_A, + TbAppHudiJobConfig.KEEP_FILE_VERSION_A, + TbAppHudiJobConfig.KEEP_COMMIT_VERSION_A, + TbAppCollectTableInfo.TAGS_A, + TbAppGlobalConfig.ZK_URL_A, + TbAppCollectTableInfo.VERSION_A, + DataSourceTableField.SCALE_A + ) + .from( + DataSource._alias_, + DataSourceTable._alias_, + DataSourceTableField._alias_, + TbAppFlinkJobConfig._alias_, + TbAppHudiJobConfig._alias_, + Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"), + Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"), + TbAppGlobalConfig._alias_, + TbAppCollectTableInfo._alias_, + TbAppHudiSyncState._alias_ + ) + .whereEq(DataSource.DS_ROLE_A, "src") + .andEq(DataSource.DS_STATE_A, STATUS_Y) + .andEq(DataSource.RECORD_STATE_A, STATUS_Y) + .andEq(DataSourceTable.DS_ID_A, Column.as(DataSource.DS_ID_A)) + .andEq(DataSourceTable.RECORD_STATE_A, STATUS_Y) + .andEq(DataSourceTableField.TABLE_ID_A, Column.as(DataSourceTable.TABLE_ID_A)) + .andEq(DataSourceTableField.RECORD_STATE_A, STATUS_Y) + .andIn(DataSource.DS_TYPE_A, "udal", "telepg") + .andEq(DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A)) + .andEq(DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A)) + .andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A)) + .andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) + .andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A)) + .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id")) + .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id")) + .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A)) + .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) + .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) + .andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y) + .andEq("tayjc_sync.status", STATUS_Y) + .andEq("tayjc_compaction.status", STATUS_Y) + .orderBy(DataSourceTableField.FIELD_SEQ_A) + .build(), + (rs, row) -> TableMeta.RowMeta.builder() + .dsName(rs.getString(1)) + .schemaName(rs.getString(2)) + .tableName(rs.getString(3)) + .tableType(rs.getString(4)) + .fieldName(rs.getString(5)) + .fieldSeq(rs.getInt(6)) + .fieldType(rs.getString(7)) + .primaryKey(rs.getString(8)) + .partitionKey(rs.getString(9)) + .length(rs.getLong(10)) + .tgtDb(rs.getString(11)) + .tgtTable(rs.getString(12)) + .tgtTableType(rs.getString(13)) + .tgtHdfsPath(rs.getString(14)) + .writeTasks(rs.getInt(15)) + .writeOperation(rs.getString(16)) + .writeTaskMaxMemory(rs.getInt(17)) + .writeBatchSize(rs.getInt(18)) + .writeRateLimit(rs.getInt(19)) + .bucketIndexNumber(rs.getInt(20)) + .compactionStrategy(rs.getString(21)) + .compactionTasks(rs.getInt(22)) + .compactionDeltaCommits(rs.getInt(23)) + .compactionDeltaSeconds(rs.getInt(24)) + .compactionAsyncEnabled(rs.getString(25)) + .compactionMaxMemory(rs.getInt(26)) + .configs(rs.getString(27)) + .filterField(rs.getString(28)) + .filterValues(rs.getString(29)) + .filterType(rs.getString(30)) + .topic(rs.getString(31)) + .pulsarAddress(rs.getString(32)) + .syncJobManagerMemory(rs.getInt(33)) + .syncTaskManagerMemory(rs.getInt(34)) + .compactionJobManagerMemory(rs.getInt(35)) + .compactionTaskManagerMemory(rs.getInt(36)) + .partitionField(rs.getString(37)) + .messageId(rs.getString(38)) + .metricPublishUrl(rs.getString(39)) + .metricPrometheusUrl(rs.getString(40)) + .metricApiUrl(rs.getString(41)) + .metricPublishDelay(rs.getInt(42)) + .metricPublishPeriod(rs.getInt(43)) + .metricPublishTimeout(rs.getInt(44)) + .metricPublishBatch(rs.getInt(45)) + .jobId(rs.getLong(46)) + .jobName(rs.getString(47)) + .checkpointRootPath(rs.getString(48)) + .sourceTasks(rs.getInt(49)) + .alias(rs.getString(50)) + .connection(rs.getString(51)) + .priority(rs.getInt(52)) + .sourceType(rs.getString(53)) + .keepFileVersion(rs.getInt(54)) + .keepCommitVersion(rs.getInt(55)) + .tags(rs.getString(56)) + .zookeeperUrl(rs.getString(57)) + .version(rs.getInt(58)) + .scala(rs.getInt(59)) + .build() + ) + ) + .asParallel(ExecutorProvider.EXECUTORS, 5) + .groupBy(TableMeta.RowMeta::getAlias) + .multiValuesView() + .collect(aliasRowMetas -> { + try { + ImmutableList rows = aliasRowMetas + .toSortedListBy(TableMeta.RowMeta::getFieldSeq) + .toImmutable(); + + ImmutableList aliasList = rows.collect(TableMeta.RowMeta::getAlias).distinct(); + checkEmptyOrMoreThanOne("alias", aliasList); + String alias = aliasList.get(0); + + ImmutableList sourceTypeList = rows.collect(TableMeta.RowMeta::getSourceType).distinct(); + checkEmptyOrMoreThanOne("source_type", sourceTypeList); + String sourceTypeText = sourceTypeList.get(0).toUpperCase(); + TableMeta.SourceType sourceType; + try { + sourceType = TableMeta.SourceType.valueOf(sourceTypeText); + } catch (IllegalArgumentException e) { + throw new Exception("Cannot parse source type " + sourceTypeText); + } + + ImmutableList dsNames = rows.collect(TableMeta.RowMeta::getDsName).distinct(); + checkEmptyOrMoreThanOne("ds_name", dsNames); + String dataSource = dsNames.get(0); + + ImmutableList schemaNames = rows.collect(TableMeta.RowMeta::getSchemaName).distinct(); + checkEmptyOrMoreThanOne("schema_name", schemaNames); + String schema = schemaNames.get(0); + + ImmutableList tableNames = rows.collect(TableMeta.RowMeta::getTableName).distinct(); + // 每次只能获取 1 张表的元信息 + checkEmptyOrMoreThanOne("table_name", tableNames); + String table = tableNames.get(0); + + ImmutableList tableTypes = rows.collect(TableMeta.RowMeta::getTableType).distinct(); + checkEmptyOrMoreThanOne("table_type", tableTypes); + String type = tableTypes.get(0); + + ImmutableList filterFields = rows.collect(TableMeta.RowMeta::getFilterField).distinct(); + checkEmptyOrMoreThanOne("filter_field", filterFields); + String filterField = filterFields.get(0); + + ImmutableList filterValueList = rows.collect(TableMeta.RowMeta::getFilterValues).distinct(); + checkEmptyOrMoreThanOne("filter_values", filterValueList); + String filterValuesText = filterValueList.get(0); + ImmutableList filterValues = StrUtil.isBlank(filterValuesText) + ? Lists.immutable.empty() + : Lists.immutable.of(filterValuesText.split(",")); + ImmutableList filterTypes = rows.collect(TableMeta.RowMeta::getFilterType).distinct(); + checkEmptyOrMoreThanOne("filter_field", filterFields); + TableMeta.FilterType filterType; + try { + filterType = TableMeta.FilterType.valueOf(filterTypes.get(0)); + } catch (IllegalArgumentException e) { + filterType = TableMeta.FilterType.NONE; + } + + ImmutableList topics = rows.collect(TableMeta.RowMeta::getTopic).distinct(); + checkEmptyOrMoreThanOne("topic", topics); + String topic = topics.get(0); + + ImmutableList pulsarAddresses = rows.collect(TableMeta.RowMeta::getPulsarAddress).distinct(); + checkEmptyOrMoreThanOne("pulsar address", pulsarAddresses); + String pulsarAddress = pulsarAddresses.get(0); + + ImmutableList priorities = rows.collect(TableMeta.RowMeta::getPriority).distinct(); + checkEmptyOrMoreThanOne("priority", priorities); + Integer priority = priorities.get(0); + + ImmutableList tagTexts = rows.collect(TableMeta.RowMeta::getTags).distinct(); + checkEmptyOrMoreThanOne("tags", tagTexts); + String tagText = ObjectUtil.isNull(tagTexts.get(0)) ? "" : tagTexts.get(0); + ImmutableList tags = Lists.immutable.of(tagText.split(",")); + + ImmutableList versions = rows.collect(TableMeta.RowMeta::getVersion).distinct(); + checkEmptyOrMoreThanOne("version", versions); + Integer version = versions.get(0); + + // 获取 Hudi 配置, 因为查出来同一张表的配置都相同, 所以直接取第一条即可 + TableMeta.RowMeta example = rows.get(0); + TableMeta.HudiMeta hudiMeta = TableMeta.HudiMeta.builder() + .targetDataSource(example.getTgtDb()) + .targetTable(example.getTgtTable()) + .targetTableType(example.getTgtTableType()) + .targetHdfsPath(example.getTgtHdfsPath()) + .sourceTasks(example.getSourceTasks()) + .writeTasks(example.getWriteTasks()) + .writeOperation(example.getWriteOperation()) + .writeTaskMaxMemory(example.getWriteTaskMaxMemory()) + .writeBatchSize(example.getWriteBatchSize()) + .writeRateLimit(example.getWriteRateLimit()) + .bucketIndexNumber(example.getBucketIndexNumber()) + .compactionStrategy(example.getCompactionStrategy()) + .compactionTasks(example.getCompactionTasks()) + .compactionDeltaCommits(example.getCompactionDeltaCommits()) + .compactionDeltaSeconds(example.getCompactionDeltaSeconds()) + .compactionAsyncEnabled(example.getCompactionAsyncEnabled()) + .compactionMaxMemory(example.getCompactionMaxMemory()) + .configs(example.getConfigs()) + .keepFileVersion(example.getKeepFileVersion()) + .keepCommitVersion(example.getKeepCommitVersion()) + .build(); + TableMeta.YarnMeta syncYarnMeta = TableMeta.YarnMeta.builder() + .jobManagerMemory(example.getSyncJobManagerMemory()) + .taskManagerMemory(example.getSyncTaskManagerMemory()) + .build(); + TableMeta.YarnMeta compactionYarnMeta = TableMeta.YarnMeta.builder() + .jobManagerMemory(example.getCompactionJobManagerMemory()) + .taskManagerMemory(example.getCompactionTaskManagerMemory()) + .build(); + TableMeta.ConfigMeta configMeta = TableMeta.ConfigMeta.builder() + .messageId(example.getMessageId()) + .metricPublishUrl(example.getMetricPublishUrl()) + .metricPrometheusUrl(example.getMetricPrometheusUrl()) + .metricApiUrl(example.getMetricApiUrl()) + .metricPublishDelay(example.getMetricPublishDelay()) + .metricPublishPeriod(example.getMetricPublishPeriod()) + .metricPublishTimeout(example.getMetricPublishTimeout()) + .metricPublishBatch(example.getMetricPublishBatch()) + .checkpointRootPath(example.getCheckpointRootPath()) + .zookeeperUrl(example.getZookeeperUrl()) + .build(); + TableMeta.JobMeta jobMeta = TableMeta.JobMeta.builder() + .id(example.getJobId()) + .name(example.getJobName()) + .build(); + + TableMeta.ConnectionMeta connectionMeta = null; + String connectionText = example.getConnection(); + if (StrUtil.isNotBlank(connectionText)) { + JSONObject connectionObj = JSONUtil.parseObj(connectionText); + connectionMeta = TableMeta.ConnectionMeta.builder() + .url(connectionObj.getStr("jdbc_url")) + .user(connectionObj.getStr("jdbc_user")) + .password(connectionObj.getStr("jdbc_password")) + .driver(connectionObj.getStr("jdbc_driver")) + .build(); + } + + ImmutableList partitionFields = rows.collect(TableMeta.RowMeta::getPartitionField).distinct(); + checkEmptyOrMoreThanOne("partition_field", filterFields); + String partitionField = partitionFields.get(0); + + List primaryKeys = Lists.mutable.empty(), + partitionKeys = Lists.mutable.empty(), + fieldMetaList = Lists.mutable.empty(); + for (TableMeta.RowMeta rowMeta : rows) { + boolean isPrimaryKey = StrUtil.equals(STATUS_Y, rowMeta.getPrimaryKey()); + boolean isPartitionKey = StrUtil.equals(STATUS_Y, rowMeta.getPartitionKey()); + TableMeta.FieldMeta fieldMeta = TableMeta.FieldMeta.builder() + .name(rowMeta.getFieldName().toUpperCase(Locale.ROOT)) + .sequence(rowMeta.getFieldSeq()) + .type(rowMeta.getFieldType()) + .isPrimaryKey(isPrimaryKey) + .partitionKey(isPartitionKey) + .length(rowMeta.getLength()) + .scala(rowMeta.getScala()) + .build(); + if (isPrimaryKey) { + primaryKeys.add(fieldMeta); + } + if (isPartitionKey) { + partitionKeys.add(fieldMeta); + } + fieldMetaList.add(fieldMeta); + } + return TableMeta.builder() + .alias(alias) + .source(dataSource) + .schema(schema) + .table(table) + .type(type) + .primaryKeys(primaryKeys) + .partitionKeys(partitionKeys) + .hudi(hudiMeta) + .fields(fieldMetaList) + .filterField(filterField) + .filterValues(filterValues.toList()) + .filterType(filterType) + .topic(topic) + .pulsarAddress(pulsarAddress) + .syncYarn(syncYarnMeta) + .compactionYarn(compactionYarnMeta) + .partitionField(partitionField) + .config(configMeta) + .job(jobMeta) + .connection(connectionMeta) + .priority(priority) + .sourceType(sourceType) + .tags(tags.toList()) + .version(version) + .build(); + } catch (Throwable throwable) { + throw new RuntimeException(throwable); + } + }) + .toList() + .toImmutable(); + } + + @Cacheable(value = "table-metas", sync = true) + @Retryable(Throwable.class) + public ImmutableList tableMetas() { + return flinkJobService.flinkJobs().flatCollect(job -> tableMetaList(job.getId())); + } + + @Cacheable(value = "table-metas", sync = true, key = "#flinkJobId") + @Retryable(Throwable.class) + public ImmutableList tableMetas(Long flinkJobId) { + return tableMetaList(flinkJobId); + } + + @Cacheable(value = "table-metas", sync = true, key = "#flinkJobId.toString()+#alias") + @Retryable(Throwable.class) + public TableMeta tableMeta(Long flinkJobId, String alias) { + return tableMetaList(flinkJobId, alias) + .getFirstOptional() + .orElseThrow(TableMetaNotFoundException::new); + } + + @Cacheable(value = "table-count", sync = true) + @Retryable(Throwable.class) + public Long tableCount() { + return mysqlJdbcTemplate.queryForObject( + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .build(), + Long.class + ); + } + + @Cacheable(value = "table-focus-count", sync = true) + @Retryable(Throwable.class) + public Long tableFocusCount() { + return mysqlJdbcTemplate.queryForObject( + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) + .build(), + Long.class + ); + } + + @Cacheable(value = "hudi-count", sync = true) + @Retryable(Throwable.class) + public Long hudiCount() { + return mysqlJdbcTemplate.queryForObject( + SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .build(), + Long.class + ); + } + + @Cacheable(value = "hudi-focus-count", sync = true) + @Retryable(Throwable.class) + public Long hudiFocusCount() { + return mysqlJdbcTemplate.queryForObject( + SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) + .build(), + Long.class + ); + } + + @Cacheable(value = "hive-count", sync = true) + @Retryable(Throwable.class) + public Long hiveCount() { + return mysqlJdbcTemplate.queryForObject( + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .build(), + Long.class + ); + } + + @Cacheable(value = "hive-focus-count", sync = true) + @Retryable(Throwable.class) + public Long hiveFocusCount() { + return mysqlJdbcTemplate.queryForObject( + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) + .build(), + Long.class + ); + } + + @Cacheable(value = "simple-table-metas", sync = true, cacheManager = "long-cache") + @Retryable(Throwable.class) + public ImmutableList simpleTableMetas(Long flinkJobId, String alias) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + SqlBuilder + .select( + TbAppFlinkJobConfig.ID_A, + TbAppFlinkJobConfig.NAME_A, + TbAppCollectTableInfo.ALIAS_A, + TbAppCollectTableInfo.SRC_SCHEMA_A, + TbAppCollectTableInfo.SRC_TABLE_A, + TbAppCollectTableInfo.TGT_DB_A, + TbAppCollectTableInfo.TGT_TABLE_A, + TbAppCollectTableInfo.TGT_HDFS_PATH_A + ) + .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) + .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) + .andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) + .build(), + (rs, row) -> new SimpleTableMeta( + rs.getLong(1), + rs.getString(2), + rs.getString(3), + rs.getString(4), + rs.getString(5), + rs.getString(6), + rs.getString(7), + rs.getString(8) + ) + ) + ); + } + + @Cacheable(value = "exists-table", sync = true) + @Retryable(Throwable.class) + public Boolean existsTable(Long flinkJobId, String alias) { + return mysqlJdbcTemplate.queryForObject( + SqlBuilder.select("count(*) > 0") + .from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_) + .whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) + .andEq(TbAppFlinkJobConfig.ID_A, flinkJobId) + .andEq(TbAppCollectTableInfo.ALIAS_A, alias) + .andEq(TbAppFlinkJobConfig.STATUS_A, "y") + .andEq(TbAppCollectTableInfo.STATUS_A, "y") + .build(), + Boolean.class + ); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/VersionService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/VersionService.java new file mode 100644 index 0000000..9bec384 --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/VersionService.java @@ -0,0 +1,354 @@ +package com.lanyuanxiaoyao.service.info.service; + +import club.kingon.sql.builder.SelectSqlBuilder; +import club.kingon.sql.builder.SqlBuilder; +import club.kingon.sql.builder.WhereSqlBuilder; +import club.kingon.sql.builder.entry.Column; +import cn.hutool.core.lang.Pair; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; +import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; +import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; +import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider; +import java.util.List; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionTemplate; + +import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; + +/** + * Version + * + * @author lanyuanxiaoyao + * @date 2024-01-03 + */ +@CacheConfig(cacheManager = "normal-cache") +@Service +public class VersionService extends BaseService { + private static final Logger logger = LoggerFactory.getLogger(VersionService.class); + private final JdbcTemplate mysqlJdbcTemplate; + private final TransactionTemplate mysqlTransactionTemplate; + private final SQLLoggerProvider.SQLLogger sqlLogger; + + public VersionService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger) { + this.mysqlJdbcTemplate = mysqlJdbcTemplate; + this.mysqlTransactionTemplate = mysqlTransactionTemplate; + this.sqlLogger = sqlLogger; + } + + private static String generateVersionTableIdCriteria(Boolean scheduled) { + return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)) + .from( + TbAppCollectTableVersion._alias_, + TbAppFlinkJobConfig._alias_, + TbAppCollectTableInfo._alias_ + ) + .whereEq(TbAppCollectTableVersion.SCHEDULED_A, scheduled) + .andEq(TbAppCollectTableVersion.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) + .andEq(TbAppCollectTableVersion.ALIAS_A, Column.as(TbAppCollectTableInfo.ALIAS_A)) + .andEq(TbAppCollectTableVersion.VERSION_A, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')")) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .build(); + } + + private SqlBuilder generateUnScheduledNormalTableSql(SelectSqlBuilder builder, String version) { + return builder + .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_, TbAppCollectTableVersion._alias_) + .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, "y") + .andEq(TbAppCollectTableInfo.STATUS_A, "y") + .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) + .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) + .andEq(TbAppCollectTableInfo.PRIORITY_A, 10000) + .andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false) + .andEq(TbAppCollectTableVersion.VERSION_A, "2018"); + } + + @Cacheable(value = "un-scheduled-normal-table", sync = true) + @Retryable(Throwable.class) + public ImmutableList unScheduledNormalTable(String version) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + generateUnScheduledNormalTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) + .build(), + (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) + ) + ); + } + + @Cacheable(value = "un_scheduled_normal_table_count", sync = true) + @Retryable(Throwable.class) + public Long unScheduledNormalTableCount(String version) { + return mysqlJdbcTemplate.queryForObject( + generateUnScheduledNormalTableSql(SqlBuilder.select(COUNT), version) + .build(), + Long.class + ); + } + + private SqlBuilder generateUnScheduledFocusTableSql(SelectSqlBuilder builder, String version) { + return builder + .from(TbAppCollectTableInfo._alias_) + .join(TbAppCollectTableVersion._alias_) + .onEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) + .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) + .whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000) + .andEq(TbAppCollectTableVersion.SCHEDULED_A, false) + .andEq(TbAppCollectTableVersion.VERSION_A, version) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y); + } + + @Cacheable(value = "un-scheduled-focus-table", sync = true) + @Retryable(Throwable.class) + public ImmutableList unScheduledFocusTable(String version) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + generateUnScheduledFocusTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) + .build(), + (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) + ) + ); + } + + @Cacheable(value = "un_scheduled_focus_table_count", sync = true) + @Retryable(Throwable.class) + public Long unScheduledFocusTableCount(String version) { + return mysqlJdbcTemplate.queryForObject( + generateUnScheduledFocusTableSql(SqlBuilder.select(COUNT), version) + .build(), + Long.class + ); + } + + @Cacheable("un-updated-version-table") + @Retryable(Throwable.class) + public ImmutableList nonUpdatedVersionTables() { + return mysqlTransactionTemplate.execute(status -> { + String listSQL = generateVersionTableIdCriteria(false); + sqlLogger.log(listSQL, "nonUpdatedVersionTables"); + List ids = mysqlJdbcTemplate.queryForList(listSQL, String.class); + return Lists.immutable.ofAll(ids); + }); + } + + @Cacheable("updated-version-table") + @Retryable(Throwable.class) + public ImmutableList updatedVersionTables() { + return mysqlTransactionTemplate.execute(status -> { + String listSQL = generateVersionTableIdCriteria(true); + sqlLogger.log(listSQL, "updatedVersionTables"); + List ids = mysqlJdbcTemplate.queryForList(listSQL, String.class); + return Lists.immutable.ofAll(ids); + }); + } + + private SqlBuilder generateVersionTableCriteria( + SelectSqlBuilder builder, + Integer page, + Integer count, + String version, + Long flinkJobId, + String alias, + String order, + String direction, + ImmutableList filterSchedules, + boolean limited, + boolean groupBy + ) { + int limit = Math.max(count, 1); + int offset = limit * Math.max(page - 1, 0); + WhereSqlBuilder root = builder + .from( + TbAppFlinkJobConfig._alias_, + TbAppCollectTableInfo._alias_, + TbAppCollectTableVersion._alias_, + TbAppHudiSyncState._alias_ + ) + .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.ALIAS_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, "y") + .andEq(TbAppCollectTableInfo.STATUS_A, "y") + .andEq(TbAppCollectTableVersion.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) + .andEq(TbAppCollectTableVersion.ALIAS_A, Column.as(TbAppCollectTableInfo.ALIAS_A)) + .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("CONCAT({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) + .andLike(ObjectUtil.isNotNull(flinkJobId), TbAppCollectTableInfo.FLINK_JOB_ID_A, flinkJobId) + .andLike(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) + .andEq(StrUtil.isNotBlank(version), TbAppCollectTableVersion.VERSION_A, version) + .andIn(ObjectUtil.isNotEmpty(filterSchedules), TbAppCollectTableVersion.SCHEDULED_A, filterSchedules); + if (groupBy) { + return root.groupBy(TbAppCollectTableVersion.SCHEDULED_A); + } else { + return root + .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) + .limit(limited, offset, count); + } + } + + @Cacheable(value = "version-tables", sync = true) + @Retryable(Throwable.class) + public PageResponse findAllVersionTables( + Integer page, + Integer count, + String version, + Long flinkJobId, + String alias, + String order, + String direction, + ImmutableList filterSchedules + ) { + return mysqlTransactionTemplate.execute(status -> { + Long total = mysqlJdbcTemplate.queryForObject( + generateVersionTableCriteria( + SqlBuilder.select(COUNT), + page, + count, + version, + flinkJobId, + alias, + order, + direction, + filterSchedules, + false, + false + ).build(), + Long.class + ); + List> groupMap = mysqlJdbcTemplate.query( + generateVersionTableCriteria( + SqlBuilder.select(TbAppCollectTableVersion.SCHEDULED_A, COUNT), + page, + count, + version, + flinkJobId, + alias, + order, + direction, + filterSchedules, + false, + true + ).build(), + (rs, row) -> new Pair<>(rs.getBoolean(1), rs.getInt(2)) + ); + ImmutableMap scheduleCount = Lists.immutable.ofAll(groupMap) + .groupBy(Pair::getKey) + .toMap() + .collectValues((key, list) -> list.getOnly().getValue()) + .toImmutable(); + String listSQL = generateVersionTableCriteria( + SqlBuilder.select( + TbAppCollectTableInfo.FLINK_JOB_ID_A, + TbAppCollectTableInfo.ALIAS_A, + TbAppCollectTableVersion.VERSION_A, + TbAppCollectTableVersion.SCHEDULED_A + ), + page, + count, + version, + flinkJobId, + alias, + order, + direction, + filterSchedules, + true, + false + ).build(); + sqlLogger.log(listSQL, "findAllVersionTables"); + List list = mysqlJdbcTemplate.query( + listSQL, + (rs, row) -> new VersionUpdated( + rs.getLong(1), + rs.getString(2), + rs.getString(3), + rs.getBoolean(4) + ) + ); + return new PageResponse<>(list, total) + .withMetadata("scheduled", scheduleCount.getOrDefault(true, 0)) + .withMetadata("unScheduled", scheduleCount.getOrDefault(false, 0)); + }); + } + + private SqlBuilder generateUnReceiveVersionNormalTableSql(SelectSqlBuilder builder, String version) { + return builder + .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) + .whereLt(TbAppCollectTableInfo.PRIORITY_A, 10000) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andNotIn( + StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A), + SqlBuilder.select(StrUtil.format("concat({}, {})", TbAppCollectTableVersion.FLINK_JOB_ID_A, TbAppCollectTableVersion.ALIAS_A)) + .from(TbAppCollectTableVersion._alias_) + .whereEq(TbAppCollectTableVersion.VERSION_A, 1) + ); + } + + @Cacheable(value = "un-receive-version-normal-table", sync = true) + @Retryable(Throwable.class) + public ImmutableList unReceiveVersionNormalTable(String version) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + generateUnReceiveVersionNormalTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) + .build(), + (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) + ) + ); + } + + @Cacheable(value = "un-receive-version-normal-table-count", sync = true) + @Retryable(Throwable.class) + public Long unReceiveVersionNormalTableCount(String version) { + return mysqlJdbcTemplate.queryForObject( + generateUnReceiveVersionNormalTableSql(SqlBuilder.select(COUNT), version) + .build(), + Long.class + ); + } + + private SqlBuilder generateUnReceiveVersionFocusTable(SelectSqlBuilder builder, String version) { + return builder + .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) + .whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000) + .andEq(TbAppCollectTableInfo.STATUS_A, "y") + .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andNotIn( + StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A), + SqlBuilder.select(StrUtil.format("concat({}, {})", TbAppCollectTableVersion.FLINK_JOB_ID_A, TbAppCollectTableVersion.ALIAS_A)) + .from(TbAppCollectTableVersion._alias_) + .whereEq(TbAppCollectTableVersion.VERSION_A, version) + ); + } + + @Cacheable(value = "un-receive-version-focus-table", sync = true) + @Retryable(Throwable.class) + public ImmutableList unReceiveVersionFocusTable(String version) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + generateUnReceiveVersionFocusTable(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) + .build(), + (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) + ) + ); + } + + @Cacheable(value = "un-receive-version-focus-table-count", sync = true) + @Retryable(Throwable.class) + public Long unReceiveVersionFocusTableCount(String version) { + return mysqlJdbcTemplate.queryForObject( + generateUnReceiveVersionFocusTable(SqlBuilder.select(COUNT), version) + .build(), + Long.class + ); + } +}