refactor(info-query): 调整代码分类

将大的service类拆分到小的类中,方便后期维护和功能调整
This commit is contained in:
2024-01-03 11:14:37 +08:00
parent df5061f77e
commit 5b00b5faaf
13 changed files with 1666 additions and 1306 deletions

View File

@@ -0,0 +1,53 @@
package com.lanyuanxiaoyao.service.info.controller;
import cn.hutool.core.util.ObjectUtil;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics;
import com.lanyuanxiaoyao.service.info.service.CompactionService;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Compaction
*
* @author lanyuanxiaoyao
* @date 2024-01-03
*/
@RestController
@RequestMapping("/info")
public class CompactionController {
private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
private final CompactionService compactionService;
public CompactionController(CompactionService compactionService) {
this.compactionService = compactionService;
}
@GetMapping("/compaction_metrics")
public PageResponse<CompactionMetrics> metrics(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "flink_job_id") Long flinkJobId,
@RequestParam(value = "alias") String alias,
@RequestParam(value = "order", required = false) String order,
@RequestParam(value = "direction", required = false) String direction,
@RequestParam(value = "filter_completes", required = false) List<Boolean> filterCompletes
) {
return compactionService.findAllCompactionMetrics(
page,
count,
flinkJobId,
alias,
order,
direction,
ObjectUtil.isNull(filterCompletes) ? Lists.immutable.empty() : Lists.immutable.ofAll(filterCompletes)
);
}
}

View File

@@ -0,0 +1,39 @@
package com.lanyuanxiaoyao.service.info.controller;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.lanyuanxiaoyao.service.info.service.FlinkJobService;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Flink Job
*
* @author lanyuanxiaoyao
* @date 2024-01-03
*/
@RestController
@RequestMapping("/info")
public class FlinkJobController {
private static final Logger logger = LoggerFactory.getLogger(FlinkJobController.class);
private final FlinkJobService flinkJobService;
public FlinkJobController(FlinkJobService flinkJobService) {
this.flinkJobService = flinkJobService;
}
@GetMapping("/flink_job/list")
public ImmutableList<FlinkJob> list() {
return flinkJobService.flinkJobs();
}
@GetMapping("/flink_job/detail")
public FlinkJob detail(@RequestParam("flink_job_id") Long flinkJobId) {
return flinkJobService.flinkJob(flinkJobId);
}
}

View File

@@ -1,12 +1,10 @@
package com.lanyuanxiaoyao.service.info.controller;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.*;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.info.TableInfoSearchCache;
import com.lanyuanxiaoyao.service.info.service.InfoService;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
@@ -25,7 +23,7 @@ import org.springframework.web.bind.annotation.RestController;
* @date 2023-04-24
*/
@RestController
@RequestMapping("info")
@RequestMapping("/info")
public class InfoController {
private static final Logger logger = LoggerFactory.getLogger(InfoController.class);
@@ -60,172 +58,11 @@ public class InfoController {
);
}
@GetMapping("/version_tables")
public PageResponse<VersionUpdated> versionTables(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "version", required = false) String version,
@RequestParam(value = "flink_job_id", required = false) Long flinkJobId,
@RequestParam(value = "alias", required = false) String alias,
@RequestParam(value = "order", required = false) String order,
@RequestParam(value = "direction", required = false) String direction,
@RequestParam(value = "filter_schedules", required = false) List<Boolean> filterSchedules
) {
return infoService.findAllVersionTables(
page,
count,
version,
flinkJobId,
alias,
order,
direction,
Lists.immutable.ofAll(filterSchedules)
);
}
@GetMapping("/job_metas")
public ImmutableList<JobAndMetas> jobAndMetas() {
return infoService.jobAndMetas();
}
@GetMapping("/flink_job/list")
public ImmutableList<FlinkJob> flinkJobs() {
return infoService.flinkJobs();
}
@GetMapping("/flink_job/detail")
public FlinkJob flinkJob(@RequestParam("flink_job_id") Long flinkJobId) {
return infoService.flinkJob(flinkJobId);
}
@GetMapping("/table_meta/list")
public ImmutableList<TableMeta> tableMetas(@RequestParam(value = "flink_job_id", required = false) Long flinkJobId) {
return ObjectUtil.isNull(flinkJobId)
? infoService.tableMetas()
: infoService.tableMetas(flinkJobId);
}
@GetMapping("/table_meta/detail")
public TableMeta tableMeta(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) {
return infoService.tableMeta(flinkJobId, alias);
}
@GetMapping("/sync_state/detail")
public SyncState syncState(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) {
return infoService.syncState(flinkJobId, alias);
}
@GetMapping("/non_updated_version_tables")
public ImmutableList<String> nonUpdatedVersionTables() {
return infoService.nonUpdatedVersionTables();
}
@GetMapping("/updated_version_tables")
public ImmutableList<String> updatedVersionTables() {
return infoService.updatedVersionTables();
}
@GetMapping("/un_receive_version_normal_table")
public ImmutableList<JobIdAndAlias> unReceiveVersionNormalTable(@RequestParam("version") String version) {
return infoService.unReceiveVersionNormalTable(version);
}
@GetMapping("/un_receive_version_normal_table_count")
public Long unReceiveVersionNormalTableCount(@RequestParam("version") String version) {
return infoService.unReceiveVersionNormalTableCount(version);
}
@GetMapping("/un_receive_version_focus_table")
public ImmutableList<JobIdAndAlias> unReceiveVersionFocusTable(@RequestParam("version") String version) {
return infoService.unReceiveVersionFocusTable(version);
}
@GetMapping("/un_receive_version_focus_table_count")
public Long unReceiveVersionFocusTableCount(@RequestParam("version") String version) {
return infoService.unReceiveVersionFocusTableCount(version);
}
@GetMapping("/un_scheduled_normal_table")
public ImmutableList<JobIdAndAlias> unScheduledNormalTable(@RequestParam("version") String version) {
return infoService.unScheduledNormalTable(version);
}
@GetMapping("/un_scheduled_normal_table_count")
public Long unScheduledNormalTableCount(@RequestParam("version") String version) {
return infoService.unScheduledNormalTableCount(version);
}
@GetMapping("/un_scheduled_focus_table")
public ImmutableList<JobIdAndAlias> unScheduledFocusTable(@RequestParam("version") String version) {
return infoService.unScheduledFocusTable(version);
}
@GetMapping("/un_scheduled_focus_table_count")
public Long unScheduledFocusTableCount(@RequestParam("version") String version) {
return infoService.unScheduledFocusTableCount(version);
}
@GetMapping("/table_count")
public Long tableCount() {
return infoService.tableCount();
}
@GetMapping("/table_focus_count")
public Long tableFocusCount() {
return infoService.tableFocusCount();
}
@GetMapping("/hudi_count")
public Long hudiCount() {
return infoService.hudiCount();
}
@GetMapping("/hudi_focus_count")
public Long hudiFocusCount() {
return infoService.hudiFocusCount();
}
@GetMapping("/hive_count")
public Long hiveCount() {
return infoService.hiveCount();
}
@GetMapping("/hive_focus_count")
public Long hiveFocusCount() {
return infoService.hiveFocusCount();
}
@GetMapping("/compaction_metrics")
public PageResponse<CompactionMetrics> compactionMetrics(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "flink_job_id") Long flinkJobId,
@RequestParam(value = "alias") String alias,
@RequestParam(value = "order", required = false) String order,
@RequestParam(value = "direction", required = false) String direction,
@RequestParam(value = "filter_completes", required = false) List<Boolean> filterCompletes
) {
return infoService.findAllCompactionMetrics(
page,
count,
flinkJobId,
alias,
order,
direction,
ObjectUtil.isNull(filterCompletes) ? Lists.immutable.empty() : Lists.immutable.ofAll(filterCompletes)
);
}
@GetMapping("/exists_table")
public Boolean existsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) {
return infoService.existsTable(flinkJobId, alias);
}
@GetMapping("/non_exists_table")
public Boolean nonExistsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) {
return !infoService.existsTable(flinkJobId, alias);
}
@GetMapping("/all_flink_job_id")
public ImmutableList<Long> allFlinkJobId(
@RequestParam(value = "key", required = false) String key,
@@ -257,12 +94,4 @@ public class InfoController {
.collect(TableInfoSearchCache::getHdfs)
.distinct();
}
@GetMapping("/simple_table_metas")
public ImmutableList<SimpleTableMeta> simpleTableMetas(
@RequestParam(value = "flink_job_id", required = false) Long flinkJobId,
@RequestParam(value = "alias", required = false) String alias
) {
return infoService.simpleTableMetas(flinkJobId, alias);
}
}

View File

@@ -0,0 +1,33 @@
package com.lanyuanxiaoyao.service.info.controller;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.lanyuanxiaoyao.service.info.service.SyncStateService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Sync State
*
* @author lanyaunxiaoyao
* @date 2024-01-03
*/
@RestController
@RequestMapping("/info")
public class SyncStateController {
private static final Logger logger = LoggerFactory.getLogger(SyncStateController.class);
private final SyncStateService syncStateService;
public SyncStateController(SyncStateService syncStateService) {
this.syncStateService = syncStateService;
}
@GetMapping("/sync_state/detail")
public SyncState syncState(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) {
return syncStateService.syncState(flinkJobId, alias);
}
}

View File

@@ -0,0 +1,91 @@
package com.lanyuanxiaoyao.service.info.controller;
import cn.hutool.core.util.ObjectUtil;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.info.SimpleTableMeta;
import com.lanyuanxiaoyao.service.info.service.TableMetaService;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Table Meta
*
* @author lanyuanxiaoyao
* @date 2024-01-03
*/
@RestController
@RequestMapping("/info")
public class TableMetaController {
private static final Logger logger = LoggerFactory.getLogger(TableMetaController.class);
private final TableMetaService tableMetaService;
public TableMetaController(TableMetaService tableMetaService) {
this.tableMetaService = tableMetaService;
}
@GetMapping("/table_meta/list")
public ImmutableList<TableMeta> list(@RequestParam(value = "flink_job_id", required = false) Long flinkJobId) {
return ObjectUtil.isNull(flinkJobId)
? tableMetaService.tableMetas()
: tableMetaService.tableMetas(flinkJobId);
}
@GetMapping("/table_meta/detail")
public TableMeta detail(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) {
return tableMetaService.tableMeta(flinkJobId, alias);
}
@GetMapping("/simple_table_metas")
public ImmutableList<SimpleTableMeta> simpleList(
@RequestParam(value = "flink_job_id", required = false) Long flinkJobId,
@RequestParam(value = "alias", required = false) String alias
) {
return tableMetaService.simpleTableMetas(flinkJobId, alias);
}
@GetMapping("/exists_table")
public Boolean existsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) {
return tableMetaService.existsTable(flinkJobId, alias);
}
@GetMapping("/non_exists_table")
public Boolean nonExistsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) {
return !tableMetaService.existsTable(flinkJobId, alias);
}
@GetMapping("/table_count")
public Long tableCount() {
return tableMetaService.tableCount();
}
@GetMapping("/table_focus_count")
public Long tableFocusCount() {
return tableMetaService.tableFocusCount();
}
@GetMapping("/hudi_count")
public Long hudiCount() {
return tableMetaService.hudiCount();
}
@GetMapping("/hudi_focus_count")
public Long hudiFocusCount() {
return tableMetaService.hudiFocusCount();
}
@GetMapping("/hive_count")
public Long hiveCount() {
return tableMetaService.hiveCount();
}
@GetMapping("/hive_focus_count")
public Long hiveFocusCount() {
return tableMetaService.hiveFocusCount();
}
}

View File

@@ -0,0 +1,106 @@
package com.lanyuanxiaoyao.service.info.controller;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated;
import com.lanyuanxiaoyao.service.info.service.VersionService;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Version
*
* @author lanyuanxiaoyao
* @date 2024-01-03
*/
@RestController
@RequestMapping("/info")
public class VersionController {
private static final Logger logger = LoggerFactory.getLogger(VersionController.class);
private final VersionService versionService;
public VersionController(VersionService versionService) {
this.versionService = versionService;
}
@GetMapping("/non_updated_version_tables")
public ImmutableList<String> nonUpdatedVersionTables() {
return versionService.nonUpdatedVersionTables();
}
@GetMapping("/updated_version_tables")
public ImmutableList<String> updatedVersionTables() {
return versionService.updatedVersionTables();
}
@GetMapping("/version_tables")
public PageResponse<VersionUpdated> list(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "version", required = false) String version,
@RequestParam(value = "flink_job_id", required = false) Long flinkJobId,
@RequestParam(value = "alias", required = false) String alias,
@RequestParam(value = "order", required = false) String order,
@RequestParam(value = "direction", required = false) String direction,
@RequestParam(value = "filter_schedules", required = false) List<Boolean> filterSchedules
) {
return versionService.findAllVersionTables(
page,
count,
version,
flinkJobId,
alias,
order,
direction,
Lists.immutable.ofAll(filterSchedules)
);
}
@GetMapping("/un_scheduled_normal_table")
public ImmutableList<JobIdAndAlias> unScheduledNormalTable(@RequestParam("version") String version) {
return versionService.unScheduledNormalTable(version);
}
@GetMapping("/un_scheduled_normal_table_count")
public Long unScheduledNormalTableCount(@RequestParam("version") String version) {
return versionService.unScheduledNormalTableCount(version);
}
@GetMapping("/un_scheduled_focus_table")
public ImmutableList<JobIdAndAlias> unScheduledFocusTable(@RequestParam("version") String version) {
return versionService.unScheduledFocusTable(version);
}
@GetMapping("/un_scheduled_focus_table_count")
public Long unScheduledFocusTableCount(@RequestParam("version") String version) {
return versionService.unScheduledFocusTableCount(version);
}
@GetMapping("/un_receive_version_normal_table")
public ImmutableList<JobIdAndAlias> unReceiveVersionNormalTable(@RequestParam("version") String version) {
return versionService.unReceiveVersionNormalTable(version);
}
@GetMapping("/un_receive_version_normal_table_count")
public Long unReceiveVersionNormalTableCount(@RequestParam("version") String version) {
return versionService.unReceiveVersionNormalTableCount(version);
}
@GetMapping("/un_receive_version_focus_table")
public ImmutableList<JobIdAndAlias> unReceiveVersionFocusTable(@RequestParam("version") String version) {
return versionService.unReceiveVersionFocusTable(version);
}
@GetMapping("/un_receive_version_focus_table_count")
public Long unReceiveVersionFocusTableCount(@RequestParam("version") String version) {
return versionService.unReceiveVersionFocusTableCount(version);
}
}

View File

@@ -0,0 +1,20 @@
package com.lanyuanxiaoyao.service.info.service;
import club.kingon.sql.builder.entry.Alias;
import cn.hutool.core.util.StrUtil;
/**
* 放公共变量
*
* @author lanyuanxiaoyao
* @date 2024-01-03
*/
public class BaseService {
protected static final String COUNT = "count(*)";
protected static final String STATUS_Y = "y";
protected static final String STATUS_N = "n";
protected static String column(Alias table, String column) {
return StrUtil.format("{}.{}", table.getAlias(), column);
}
}

View File

@@ -0,0 +1,191 @@
package com.lanyuanxiaoyao.service.info.service;
import club.kingon.sql.builder.SelectSqlBuilder;
import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.entry.Alias;
import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.SQLConstants;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics;
import java.sql.Timestamp;
import java.util.List;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
/**
* 压缩
*
* @author lanyuanxiaoyao
* @date 2024-01-03
*/
@CacheConfig(cacheManager = "normal-cache")
@Service
public class CompactionService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(CompactionService.class);
private final JdbcTemplate mysqlJdbcTemplate;
private final TransactionTemplate mysqlTransactionTemplate;
public CompactionService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate) {
this.mysqlJdbcTemplate = mysqlJdbcTemplate;
this.mysqlTransactionTemplate = mysqlTransactionTemplate;
}
private SqlBuilder generateCompactionMetricsCriteria(
SelectSqlBuilder builder,
Integer page,
Integer count,
Long flinkJobId,
String alias,
String order,
String direction,
ImmutableList<Boolean> filterCompletes,
boolean limited
) {
int limit = Math.max(count, 1);
int offset = limit * Math.max(page - 1, 0);
Alias m1 = Alias.of(
SqlBuilder.selectAll()
.from(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics._alias_)
.whereEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.TYPE_A, "pre")
.andEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.FLINK_JOB_ID_A, flinkJobId)
.andEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.ALIAS_A, alias),
"m1"
);
Alias m2 = Alias.of(
SqlBuilder.selectAll()
.from(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics._alias_)
.whereEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.TYPE_A, "complete")
.andEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.FLINK_JOB_ID_A, flinkJobId)
.andEq(SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics.ALIAS_A, alias),
"m2"
);
return builder
.from(m1)
.leftJoin(m2)
.onEq(column(m1, "flink_job_id"), Column.as(column(m2, "flink_job_id")))
.andEq(column(m1, "alias"), Column.as(column(m2, "alias")))
.andEq(column(m1, "application_id"), Column.as(column(m2, "application_id")))
.andEq(column(m1, "compaction_plan_instant"), Column.as(column(m2, "compaction_plan_instant")))
.whereNotNull(filterCompletes.anySatisfy(b -> b), column(m2, "type"))
.orNull(filterCompletes.anySatisfy(b -> !b), column(m2, "type"))
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), StrUtil.format("m1.{} {}", order, direction))
.limit(limited, offset, count);
}
@Cacheable(value = "compaction-metrics", sync = true)
@Retryable(Throwable.class)
public PageResponse<CompactionMetrics> findAllCompactionMetrics(
Integer page,
Integer count,
Long flinkJobId,
String alias,
String order,
String direction,
ImmutableList<Boolean> filterCompletes
) {
return mysqlTransactionTemplate.execute(status -> {
Long total = mysqlJdbcTemplate.queryForObject(
generateCompactionMetricsCriteria(
SqlBuilder.select(COUNT),
page,
count,
flinkJobId,
alias,
order,
direction,
filterCompletes,
false
).build(),
Long.class
);
List<CompactionMetrics> list = mysqlJdbcTemplate.query(
generateCompactionMetricsCriteria(
SqlBuilder.select(
"m1.flink_job_id",
"m1.alias",
"m1.application_id",
"m1.cluster",
"m1.compaction_plan_instant",
"m2.type is not null as is_complete",
"m1.update_time as started_time",
"m2.update_time as finished_time",
"m1.total_scan_time as pre_total_scan_time",
"m1.total_log_files_compacted as pre_total_log_files_compacted",
"m1.total_log_files_size as pre_total_log_files_size",
"m1.total_records_deleted as pre_total_records_deleted",
"m1.total_records_updated as pre_total_records_updated",
"m1.total_records_compacted as pre_total_records_compacted",
"m2.total_scan_time as complete_total_scan_time",
"m2.total_log_files_compacted as complete_total_log_files_compacted",
"m2.total_log_files_size as complete_total_log_files_size",
"m2.total_records_deleted as complete_total_records_deleted",
"m2.total_records_updated as complete_total_records_updated",
"m2.total_records_compacted as complete_total_records_compacted"
),
page,
count,
flinkJobId,
alias,
order,
direction,
filterCompletes,
true
).build(),
(rs, row) -> {
boolean isComplete = rs.getBoolean(6);
if (ObjectUtil.isNull(isComplete)) {
isComplete = false;
}
Timestamp startedTimestamp = rs.getTimestamp(7);
long startedTime = 0;
if (ObjectUtil.isNotNull(startedTimestamp)) {
startedTime = startedTimestamp.getTime();
}
Timestamp finishedTimestamp = rs.getTimestamp(8);
long finishedTime = 0;
if (ObjectUtil.isNotNull(finishedTimestamp)) {
finishedTime = finishedTimestamp.getTime();
}
CompactionMetrics.Statistics before = new CompactionMetrics.Statistics(
rs.getLong(9),
rs.getLong(10),
rs.getLong(11),
rs.getLong(12),
rs.getLong(13),
rs.getLong(14)
);
CompactionMetrics.Statistics after = new CompactionMetrics.Statistics(
rs.getLong(15),
rs.getLong(16),
rs.getLong(17),
rs.getLong(18),
rs.getLong(19),
rs.getLong(20)
);
return new CompactionMetrics(
rs.getLong(1),
rs.getString(2),
rs.getString(3),
rs.getString(4),
rs.getString(5),
isComplete,
startedTime,
finishedTime,
before,
after
);
}
);
return new PageResponse<>(list, total);
});
}
}

View File

@@ -0,0 +1,91 @@
package com.lanyuanxiaoyao.service.info.service;
import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.util.ObjectUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.eshore.odcp.hudi.connector.exception.FlinkJobNotFoundException;
import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppYarnJobConfig;
/**
* Flink Job
*
* @author lanyuanxiaoyao
* @date 2024-01-03
*/
@CacheConfig(cacheManager = "normal-cache")
@Service
public class FlinkJobService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(FlinkJobService.class);
private final JdbcTemplate mysqlJdbcTemplate;
public FlinkJobService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger) {
this.mysqlJdbcTemplate = mysqlJdbcTemplate;
}
private ImmutableList<FlinkJob> flinkJobList(Long flinkJobId) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
SqlBuilder.select(
TbAppFlinkJobConfig.ID_A,
TbAppFlinkJobConfig.NAME_A,
TbAppFlinkJobConfig.RUN_MODE_A,
TbAppYarnJobConfig.JOB_MANAGER_MEMORY_A,
TbAppYarnJobConfig.TASK_MANAGER_MEMORY_A
)
.from(TbAppFlinkJobConfig._alias_)
.leftJoin(TbAppYarnJobConfig._alias_)
.onEq(TbAppFlinkJobConfig.ONE_IN_ONE_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfig.ID_A))
.whereEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
.build(),
(rs, row) -> {
String runModeText = rs.getString(3);
FlinkJob.RunMode mode;
try {
mode = FlinkJob.RunMode.valueOf(runModeText);
} catch (IllegalArgumentException e) {
mode = FlinkJob.RunMode.ALL_IN_ONE;
}
TableMeta.YarnMeta yarnMeta = TableMeta.YarnMeta.builder()
.jobManagerMemory(rs.getInt(4))
.taskManagerMemory(rs.getInt(5))
.build();
return FlinkJob.builder()
.id(rs.getLong(1))
.name(rs.getString(2))
.runMode(mode)
.oneInOneSyncYarn(yarnMeta)
.build();
})
);
}
@Cacheable(value = "flink-jobs", sync = true)
@Retryable(Throwable.class)
public ImmutableList<FlinkJob> flinkJobs() {
return flinkJobList(null);
}
@Cacheable(value = "flink-jobs", sync = true, key = "#flinkJobId")
@Retryable(Throwable.class)
public FlinkJob flinkJob(Long flinkJobId) {
return flinkJobList(flinkJobId)
.getFirstOptional()
.orElseThrow(() -> new FlinkJobNotFoundException(flinkJobId));
}
}

View File

@@ -0,0 +1,91 @@
package com.lanyuanxiaoyao.service.info.service;
import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.exception.SyncStateNotFoundException;
import java.sql.Timestamp;
import java.util.function.Function;
import org.eclipse.collections.api.factory.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*;
/**
* Sync State
*
* @author lanyuanxiaoyao
* @date 2024-01-03
*/
@CacheConfig(cacheManager = "normal-cache")
@Service
public class SyncStateService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(SyncStateService.class);
private final JdbcTemplate mysqlJdbcTemplate;
public SyncStateService(JdbcTemplate mysqlJdbcTemplate) {
this.mysqlJdbcTemplate = mysqlJdbcTemplate;
}
@Cacheable(value = "sync-state", sync = true, key = "#flinkJobId.toString()+#alias")
@Retryable(Throwable.class)
public SyncState syncState(Long flinkJobId, String alias) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
SqlBuilder.select(
TbAppFlinkJobConfig.ID_A,
TbAppCollectTableInfo.ALIAS_A,
TbAppHudiSyncState.MESSAGE_ID_A,
TbAppHudiSyncState.SOURCE_START_TIME_A,
TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_A,
TbAppHudiSyncState.SOURCE_PUBLISH_TIME_A,
TbAppHudiSyncState.SOURCE_OP_TIME_A,
TbAppHudiSyncState.COMPACTION_START_TIME_A,
TbAppHudiSyncState.COMPACTION_FINISH_TIME_A,
TbAppHudiSyncState.COMPACTION_APPLICATION_ID_A,
TbAppHudiSyncState.COMPACTION_STATUS_A,
TbAppHudiSyncState.COMPACTION_STATUS_TIME_A,
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_A
)
.from(
TbAppFlinkJobConfig._alias_,
TbAppCollectTableInfo._alias_,
TbAppHudiSyncState._alias_
)
.whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias)
.build(),
(rs, row) -> {
Function<Timestamp, Long> dateConvertor = timestamp -> timestamp == null ? 0 : timestamp.getTime();
return SyncState.builder()
.flinkJobId(rs.getLong(1))
.alias(rs.getString(2))
.messageId(rs.getString(3))
.sourceStartTime(dateConvertor.apply(rs.getTimestamp(4)))
.sourceCheckpointTime(dateConvertor.apply(rs.getTimestamp(5)))
.sourcePublishTime(dateConvertor.apply(rs.getTimestamp(6)))
.sourceOperationTime(dateConvertor.apply(rs.getTimestamp(7)))
.compactionStartTime(dateConvertor.apply(rs.getTimestamp(8)))
.compactionFinishTime(dateConvertor.apply(rs.getTimestamp(9)))
.compactionApplicationId(rs.getString(10))
.compactionStatus(rs.getString(11))
.compactionStatusTime(dateConvertor.apply(rs.getTimestamp(12)))
.compactionLatestOperationTime(dateConvertor.apply(rs.getTimestamp(13)))
.build();
}
)
).getFirstOptional().orElseThrow(SyncStateNotFoundException::new);
}
}

View File

@@ -0,0 +1,582 @@
package com.lanyuanxiaoyao.service.info.service;
import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.entry.Alias;
import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.collection.IterUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.eshore.odcp.hudi.connector.exception.ConfigException;
import com.eshore.odcp.hudi.connector.exception.TableMetaNotFoundException;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.info.SimpleTableMeta;
import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider;
import java.util.List;
import java.util.Locale;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*;
import static com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.*;
/**
* Table Meta
*
* @author lanyuanxiaoyao
* @date 2024-01-03
*/
@CacheConfig(cacheManager = "normal-cache")
@Service
public class TableMetaService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(TableMetaService.class);
private final JdbcTemplate mysqlJdbcTemplate;
private final FlinkJobService flinkJobService;
public TableMetaService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger, FlinkJobService flinkJobService) {
this.mysqlJdbcTemplate = mysqlJdbcTemplate;
this.flinkJobService = flinkJobService;
}
private static void checkMoreThanOne(String fieldName, Iterable<?> iterable) throws ConfigException {
ConfigException.check(fieldName + " cannot be more than 1", () -> IterUtil.size(iterable) > 1);
}
private static void checkEmpty(String fieldName, Iterable<?> iterable) throws ConfigException {
ConfigException.check(fieldName + " cannot be empty", () -> IterUtil.isEmpty(iterable));
}
private static void checkEmptyOrMoreThanOne(String fieldName, Iterable<?> iterable) throws ConfigException {
checkEmpty(fieldName, iterable);
checkMoreThanOne(fieldName, iterable);
}
public ImmutableList<TableMeta> tableMetaList(Long flinkJobId) {
return tableMetaList(flinkJobId, null);
}
private ImmutableList<TableMeta> tableMetaList(Long flinkJobId, String aliasText) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
SqlBuilder.select(
DataSource.DS_NAME_A,
DataSource.SCHEMA_NAME_A,
DataSourceTable.TABLE_NAME_A,
DataSourceTable.TABLE_TYPE_A,
DataSourceTableField.FIELD_NAME_A,
DataSourceTableField.FIELD_SEQ_A,
DataSourceTableField.FIELD_TYPE_A,
DataSourceTableField.PRIMARY_KEY_A,
DataSourceTableField.PARTITION_KEY_A,
DataSourceTableField.LENGTH_A,
TbAppCollectTableInfo.TGT_DB_A,
TbAppCollectTableInfo.TGT_TABLE_A,
TbAppCollectTableInfo.TGT_TABLE_TYPE_A,
TbAppCollectTableInfo.TGT_HDFS_PATH_A,
TbAppHudiJobConfig.WRITE_TASKS_A,
TbAppHudiJobConfig.WRITE_OPERATION_A,
TbAppHudiJobConfig.WRITE_TASK_MAX_MEMORY_A,
TbAppHudiJobConfig.WRITE_BATCH_SIZE_A,
TbAppHudiJobConfig.WRITE_RATE_LIMIT_A,
TbAppCollectTableInfo.BUCKET_NUMBER_A,
TbAppHudiJobConfig.COMPACTION_STRATEGY_A,
TbAppHudiJobConfig.COMPACTION_TASKS_A,
TbAppHudiJobConfig.COMPACTION_DELTA_COMMITS_A,
TbAppHudiJobConfig.COMPACTION_DELTA_SECONDS_A,
TbAppHudiJobConfig.COMPACTION_ASYNC_ENABLED_A,
TbAppHudiJobConfig.COMPACTION_MAX_MEMORY_A,
TbAppHudiJobConfig.CONFIGS_A,
TbAppCollectTableInfo.FILTER_FIELD_A,
TbAppCollectTableInfo.FILTER_VALUES_A,
TbAppCollectTableInfo.FILTER_TYPE_A,
TbAppCollectTableInfo.SRC_TOPIC_A,
TbAppCollectTableInfo.SRC_PULSAR_ADDR_A,
Alias.of("tayjc_sync.job_manager_memory", "sync_job_manager_memory"),
Alias.of("tayjc_sync.task_manager_memory", "sync_task_manager_memory"),
Alias.of("tayjc_compaction.job_manager_memory", "compaction_job_manager_memory"),
Alias.of("tayjc_compaction.task_manager_memory", "compaction_task_manger_momory"),
TbAppCollectTableInfo.PARTITION_FIELD_A,
TbAppHudiSyncState.MESSAGE_ID_A,
TbAppGlobalConfig.METRIC_PUBLISH_URL_A,
TbAppGlobalConfig.METRIC_PROMETHEUS_URL_A,
TbAppGlobalConfig.METRIC_API_URL_A,
TbAppGlobalConfig.METRIC_PUBLISH_DELAY_A,
TbAppGlobalConfig.METRIC_PUBLISH_PERIOD_A,
TbAppGlobalConfig.METRIC_PUBLISH_TIMEOUT_A,
TbAppGlobalConfig.METRIC_PUBLISH_BATCH_A,
Alias.of(TbAppFlinkJobConfig.ID_A, "job_id"),
Alias.of(TbAppFlinkJobConfig.NAME_A, "job_name"),
TbAppGlobalConfig.CHECKPOINT_ROOT_PATH_A,
TbAppHudiJobConfig.SOURCE_TASKS_A,
TbAppCollectTableInfo.ALIAS_A,
DataSource.CONNECTION_A,
TbAppCollectTableInfo.PRIORITY_A,
DataSource.DS_TYPE_A,
TbAppHudiJobConfig.KEEP_FILE_VERSION_A,
TbAppHudiJobConfig.KEEP_COMMIT_VERSION_A,
TbAppCollectTableInfo.TAGS_A,
TbAppGlobalConfig.ZK_URL_A,
TbAppCollectTableInfo.VERSION_A,
DataSourceTableField.SCALE_A
)
.from(
DataSource._alias_,
DataSourceTable._alias_,
DataSourceTableField._alias_,
TbAppFlinkJobConfig._alias_,
TbAppHudiJobConfig._alias_,
Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"),
Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"),
TbAppGlobalConfig._alias_,
TbAppCollectTableInfo._alias_,
TbAppHudiSyncState._alias_
)
.whereEq(DataSource.DS_ROLE_A, "src")
.andEq(DataSource.DS_STATE_A, STATUS_Y)
.andEq(DataSource.RECORD_STATE_A, STATUS_Y)
.andEq(DataSourceTable.DS_ID_A, Column.as(DataSource.DS_ID_A))
.andEq(DataSourceTable.RECORD_STATE_A, STATUS_Y)
.andEq(DataSourceTableField.TABLE_ID_A, Column.as(DataSourceTable.TABLE_ID_A))
.andEq(DataSourceTableField.RECORD_STATE_A, STATUS_Y)
.andIn(DataSource.DS_TYPE_A, "udal", "telepg")
.andEq(DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A))
.andEq(DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A))
.andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A))
.andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A))
.andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id"))
.andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id"))
.andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A))
.andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y)
.andEq("tayjc_sync.status", STATUS_Y)
.andEq("tayjc_compaction.status", STATUS_Y)
.orderBy(DataSourceTableField.FIELD_SEQ_A)
.build(),
(rs, row) -> TableMeta.RowMeta.builder()
.dsName(rs.getString(1))
.schemaName(rs.getString(2))
.tableName(rs.getString(3))
.tableType(rs.getString(4))
.fieldName(rs.getString(5))
.fieldSeq(rs.getInt(6))
.fieldType(rs.getString(7))
.primaryKey(rs.getString(8))
.partitionKey(rs.getString(9))
.length(rs.getLong(10))
.tgtDb(rs.getString(11))
.tgtTable(rs.getString(12))
.tgtTableType(rs.getString(13))
.tgtHdfsPath(rs.getString(14))
.writeTasks(rs.getInt(15))
.writeOperation(rs.getString(16))
.writeTaskMaxMemory(rs.getInt(17))
.writeBatchSize(rs.getInt(18))
.writeRateLimit(rs.getInt(19))
.bucketIndexNumber(rs.getInt(20))
.compactionStrategy(rs.getString(21))
.compactionTasks(rs.getInt(22))
.compactionDeltaCommits(rs.getInt(23))
.compactionDeltaSeconds(rs.getInt(24))
.compactionAsyncEnabled(rs.getString(25))
.compactionMaxMemory(rs.getInt(26))
.configs(rs.getString(27))
.filterField(rs.getString(28))
.filterValues(rs.getString(29))
.filterType(rs.getString(30))
.topic(rs.getString(31))
.pulsarAddress(rs.getString(32))
.syncJobManagerMemory(rs.getInt(33))
.syncTaskManagerMemory(rs.getInt(34))
.compactionJobManagerMemory(rs.getInt(35))
.compactionTaskManagerMemory(rs.getInt(36))
.partitionField(rs.getString(37))
.messageId(rs.getString(38))
.metricPublishUrl(rs.getString(39))
.metricPrometheusUrl(rs.getString(40))
.metricApiUrl(rs.getString(41))
.metricPublishDelay(rs.getInt(42))
.metricPublishPeriod(rs.getInt(43))
.metricPublishTimeout(rs.getInt(44))
.metricPublishBatch(rs.getInt(45))
.jobId(rs.getLong(46))
.jobName(rs.getString(47))
.checkpointRootPath(rs.getString(48))
.sourceTasks(rs.getInt(49))
.alias(rs.getString(50))
.connection(rs.getString(51))
.priority(rs.getInt(52))
.sourceType(rs.getString(53))
.keepFileVersion(rs.getInt(54))
.keepCommitVersion(rs.getInt(55))
.tags(rs.getString(56))
.zookeeperUrl(rs.getString(57))
.version(rs.getInt(58))
.scala(rs.getInt(59))
.build()
)
)
.asParallel(ExecutorProvider.EXECUTORS, 5)
.groupBy(TableMeta.RowMeta::getAlias)
.multiValuesView()
.collect(aliasRowMetas -> {
try {
ImmutableList<TableMeta.RowMeta> rows = aliasRowMetas
.toSortedListBy(TableMeta.RowMeta::getFieldSeq)
.toImmutable();
ImmutableList<String> aliasList = rows.collect(TableMeta.RowMeta::getAlias).distinct();
checkEmptyOrMoreThanOne("alias", aliasList);
String alias = aliasList.get(0);
ImmutableList<String> sourceTypeList = rows.collect(TableMeta.RowMeta::getSourceType).distinct();
checkEmptyOrMoreThanOne("source_type", sourceTypeList);
String sourceTypeText = sourceTypeList.get(0).toUpperCase();
TableMeta.SourceType sourceType;
try {
sourceType = TableMeta.SourceType.valueOf(sourceTypeText);
} catch (IllegalArgumentException e) {
throw new Exception("Cannot parse source type " + sourceTypeText);
}
ImmutableList<String> dsNames = rows.collect(TableMeta.RowMeta::getDsName).distinct();
checkEmptyOrMoreThanOne("ds_name", dsNames);
String dataSource = dsNames.get(0);
ImmutableList<String> schemaNames = rows.collect(TableMeta.RowMeta::getSchemaName).distinct();
checkEmptyOrMoreThanOne("schema_name", schemaNames);
String schema = schemaNames.get(0);
ImmutableList<String> tableNames = rows.collect(TableMeta.RowMeta::getTableName).distinct();
// 每次只能获取 1 张表的元信息
checkEmptyOrMoreThanOne("table_name", tableNames);
String table = tableNames.get(0);
ImmutableList<String> tableTypes = rows.collect(TableMeta.RowMeta::getTableType).distinct();
checkEmptyOrMoreThanOne("table_type", tableTypes);
String type = tableTypes.get(0);
ImmutableList<String> filterFields = rows.collect(TableMeta.RowMeta::getFilterField).distinct();
checkEmptyOrMoreThanOne("filter_field", filterFields);
String filterField = filterFields.get(0);
ImmutableList<String> filterValueList = rows.collect(TableMeta.RowMeta::getFilterValues).distinct();
checkEmptyOrMoreThanOne("filter_values", filterValueList);
String filterValuesText = filterValueList.get(0);
ImmutableList<String> filterValues = StrUtil.isBlank(filterValuesText)
? Lists.immutable.empty()
: Lists.immutable.of(filterValuesText.split(","));
ImmutableList<String> filterTypes = rows.collect(TableMeta.RowMeta::getFilterType).distinct();
checkEmptyOrMoreThanOne("filter_field", filterFields);
TableMeta.FilterType filterType;
try {
filterType = TableMeta.FilterType.valueOf(filterTypes.get(0));
} catch (IllegalArgumentException e) {
filterType = TableMeta.FilterType.NONE;
}
ImmutableList<String> topics = rows.collect(TableMeta.RowMeta::getTopic).distinct();
checkEmptyOrMoreThanOne("topic", topics);
String topic = topics.get(0);
ImmutableList<String> pulsarAddresses = rows.collect(TableMeta.RowMeta::getPulsarAddress).distinct();
checkEmptyOrMoreThanOne("pulsar address", pulsarAddresses);
String pulsarAddress = pulsarAddresses.get(0);
ImmutableList<Integer> priorities = rows.collect(TableMeta.RowMeta::getPriority).distinct();
checkEmptyOrMoreThanOne("priority", priorities);
Integer priority = priorities.get(0);
ImmutableList<String> tagTexts = rows.collect(TableMeta.RowMeta::getTags).distinct();
checkEmptyOrMoreThanOne("tags", tagTexts);
String tagText = ObjectUtil.isNull(tagTexts.get(0)) ? "" : tagTexts.get(0);
ImmutableList<String> tags = Lists.immutable.of(tagText.split(","));
ImmutableList<Integer> versions = rows.collect(TableMeta.RowMeta::getVersion).distinct();
checkEmptyOrMoreThanOne("version", versions);
Integer version = versions.get(0);
// 获取 Hudi 配置, 因为查出来同一张表的配置都相同, 所以直接取第一条即可
TableMeta.RowMeta example = rows.get(0);
TableMeta.HudiMeta hudiMeta = TableMeta.HudiMeta.builder()
.targetDataSource(example.getTgtDb())
.targetTable(example.getTgtTable())
.targetTableType(example.getTgtTableType())
.targetHdfsPath(example.getTgtHdfsPath())
.sourceTasks(example.getSourceTasks())
.writeTasks(example.getWriteTasks())
.writeOperation(example.getWriteOperation())
.writeTaskMaxMemory(example.getWriteTaskMaxMemory())
.writeBatchSize(example.getWriteBatchSize())
.writeRateLimit(example.getWriteRateLimit())
.bucketIndexNumber(example.getBucketIndexNumber())
.compactionStrategy(example.getCompactionStrategy())
.compactionTasks(example.getCompactionTasks())
.compactionDeltaCommits(example.getCompactionDeltaCommits())
.compactionDeltaSeconds(example.getCompactionDeltaSeconds())
.compactionAsyncEnabled(example.getCompactionAsyncEnabled())
.compactionMaxMemory(example.getCompactionMaxMemory())
.configs(example.getConfigs())
.keepFileVersion(example.getKeepFileVersion())
.keepCommitVersion(example.getKeepCommitVersion())
.build();
TableMeta.YarnMeta syncYarnMeta = TableMeta.YarnMeta.builder()
.jobManagerMemory(example.getSyncJobManagerMemory())
.taskManagerMemory(example.getSyncTaskManagerMemory())
.build();
TableMeta.YarnMeta compactionYarnMeta = TableMeta.YarnMeta.builder()
.jobManagerMemory(example.getCompactionJobManagerMemory())
.taskManagerMemory(example.getCompactionTaskManagerMemory())
.build();
TableMeta.ConfigMeta configMeta = TableMeta.ConfigMeta.builder()
.messageId(example.getMessageId())
.metricPublishUrl(example.getMetricPublishUrl())
.metricPrometheusUrl(example.getMetricPrometheusUrl())
.metricApiUrl(example.getMetricApiUrl())
.metricPublishDelay(example.getMetricPublishDelay())
.metricPublishPeriod(example.getMetricPublishPeriod())
.metricPublishTimeout(example.getMetricPublishTimeout())
.metricPublishBatch(example.getMetricPublishBatch())
.checkpointRootPath(example.getCheckpointRootPath())
.zookeeperUrl(example.getZookeeperUrl())
.build();
TableMeta.JobMeta jobMeta = TableMeta.JobMeta.builder()
.id(example.getJobId())
.name(example.getJobName())
.build();
TableMeta.ConnectionMeta connectionMeta = null;
String connectionText = example.getConnection();
if (StrUtil.isNotBlank(connectionText)) {
JSONObject connectionObj = JSONUtil.parseObj(connectionText);
connectionMeta = TableMeta.ConnectionMeta.builder()
.url(connectionObj.getStr("jdbc_url"))
.user(connectionObj.getStr("jdbc_user"))
.password(connectionObj.getStr("jdbc_password"))
.driver(connectionObj.getStr("jdbc_driver"))
.build();
}
ImmutableList<String> partitionFields = rows.collect(TableMeta.RowMeta::getPartitionField).distinct();
checkEmptyOrMoreThanOne("partition_field", filterFields);
String partitionField = partitionFields.get(0);
List<TableMeta.FieldMeta> primaryKeys = Lists.mutable.empty(),
partitionKeys = Lists.mutable.empty(),
fieldMetaList = Lists.mutable.empty();
for (TableMeta.RowMeta rowMeta : rows) {
boolean isPrimaryKey = StrUtil.equals(STATUS_Y, rowMeta.getPrimaryKey());
boolean isPartitionKey = StrUtil.equals(STATUS_Y, rowMeta.getPartitionKey());
TableMeta.FieldMeta fieldMeta = TableMeta.FieldMeta.builder()
.name(rowMeta.getFieldName().toUpperCase(Locale.ROOT))
.sequence(rowMeta.getFieldSeq())
.type(rowMeta.getFieldType())
.isPrimaryKey(isPrimaryKey)
.partitionKey(isPartitionKey)
.length(rowMeta.getLength())
.scala(rowMeta.getScala())
.build();
if (isPrimaryKey) {
primaryKeys.add(fieldMeta);
}
if (isPartitionKey) {
partitionKeys.add(fieldMeta);
}
fieldMetaList.add(fieldMeta);
}
return TableMeta.builder()
.alias(alias)
.source(dataSource)
.schema(schema)
.table(table)
.type(type)
.primaryKeys(primaryKeys)
.partitionKeys(partitionKeys)
.hudi(hudiMeta)
.fields(fieldMetaList)
.filterField(filterField)
.filterValues(filterValues.toList())
.filterType(filterType)
.topic(topic)
.pulsarAddress(pulsarAddress)
.syncYarn(syncYarnMeta)
.compactionYarn(compactionYarnMeta)
.partitionField(partitionField)
.config(configMeta)
.job(jobMeta)
.connection(connectionMeta)
.priority(priority)
.sourceType(sourceType)
.tags(tags.toList())
.version(version)
.build();
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
})
.toList()
.toImmutable();
}
@Cacheable(value = "table-metas", sync = true)
@Retryable(Throwable.class)
public ImmutableList<TableMeta> tableMetas() {
return flinkJobService.flinkJobs().flatCollect(job -> tableMetaList(job.getId()));
}
@Cacheable(value = "table-metas", sync = true, key = "#flinkJobId")
@Retryable(Throwable.class)
public ImmutableList<TableMeta> tableMetas(Long flinkJobId) {
return tableMetaList(flinkJobId);
}
@Cacheable(value = "table-metas", sync = true, key = "#flinkJobId.toString()+#alias")
@Retryable(Throwable.class)
public TableMeta tableMeta(Long flinkJobId, String alias) {
return tableMetaList(flinkJobId, alias)
.getFirstOptional()
.orElseThrow(TableMetaNotFoundException::new);
}
@Cacheable(value = "table-count", sync = true)
@Retryable(Throwable.class)
public Long tableCount() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A))
.from(TbAppCollectTableInfo._alias_)
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(),
Long.class
);
}
@Cacheable(value = "table-focus-count", sync = true)
@Retryable(Throwable.class)
public Long tableFocusCount() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A))
.from(TbAppCollectTableInfo._alias_)
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.build(),
Long.class
);
}
@Cacheable(value = "hudi-count", sync = true)
@Retryable(Throwable.class)
public Long hudiCount() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A))
.from(TbAppCollectTableInfo._alias_)
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(),
Long.class
);
}
@Cacheable(value = "hudi-focus-count", sync = true)
@Retryable(Throwable.class)
public Long hudiFocusCount() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A))
.from(TbAppCollectTableInfo._alias_)
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.build(),
Long.class
);
}
@Cacheable(value = "hive-count", sync = true)
@Retryable(Throwable.class)
public Long hiveCount() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A))
.from(TbAppCollectTableInfo._alias_)
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(),
Long.class
);
}
@Cacheable(value = "hive-focus-count", sync = true)
@Retryable(Throwable.class)
public Long hiveFocusCount() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A))
.from(TbAppCollectTableInfo._alias_)
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.build(),
Long.class
);
}
@Cacheable(value = "simple-table-metas", sync = true, cacheManager = "long-cache")
@Retryable(Throwable.class)
public ImmutableList<SimpleTableMeta> simpleTableMetas(Long flinkJobId, String alias) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
SqlBuilder
.select(
TbAppFlinkJobConfig.ID_A,
TbAppFlinkJobConfig.NAME_A,
TbAppCollectTableInfo.ALIAS_A,
TbAppCollectTableInfo.SRC_SCHEMA_A,
TbAppCollectTableInfo.SRC_TABLE_A,
TbAppCollectTableInfo.TGT_DB_A,
TbAppCollectTableInfo.TGT_TABLE_A,
TbAppCollectTableInfo.TGT_HDFS_PATH_A
)
.from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_)
.whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias)
.build(),
(rs, row) -> new SimpleTableMeta(
rs.getLong(1),
rs.getString(2),
rs.getString(3),
rs.getString(4),
rs.getString(5),
rs.getString(6),
rs.getString(7),
rs.getString(8)
)
)
);
}
@Cacheable(value = "exists-table", sync = true)
@Retryable(Throwable.class)
public Boolean existsTable(Long flinkJobId, String alias) {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(*) > 0")
.from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_)
.whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(TbAppCollectTableInfo.ALIAS_A, alias)
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.build(),
Boolean.class
);
}
}

View File

@@ -0,0 +1,354 @@
package com.lanyuanxiaoyao.service.info.service;
import club.kingon.sql.builder.SelectSqlBuilder;
import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.WhereSqlBuilder;
import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated;
import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*;
/**
* Version
*
* @author lanyuanxiaoyao
* @date 2024-01-03
*/
@CacheConfig(cacheManager = "normal-cache")
@Service
public class VersionService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(VersionService.class);
private final JdbcTemplate mysqlJdbcTemplate;
private final TransactionTemplate mysqlTransactionTemplate;
private final SQLLoggerProvider.SQLLogger sqlLogger;
public VersionService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger) {
this.mysqlJdbcTemplate = mysqlJdbcTemplate;
this.mysqlTransactionTemplate = mysqlTransactionTemplate;
this.sqlLogger = sqlLogger;
}
private static String generateVersionTableIdCriteria(Boolean scheduled) {
return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))
.from(
TbAppCollectTableVersion._alias_,
TbAppFlinkJobConfig._alias_,
TbAppCollectTableInfo._alias_
)
.whereEq(TbAppCollectTableVersion.SCHEDULED_A, scheduled)
.andEq(TbAppCollectTableVersion.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppCollectTableVersion.ALIAS_A, Column.as(TbAppCollectTableInfo.ALIAS_A))
.andEq(TbAppCollectTableVersion.VERSION_A, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')"))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build();
}
private SqlBuilder generateUnScheduledNormalTableSql(SelectSqlBuilder builder, String version) {
return builder
.from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_, TbAppCollectTableVersion._alias_)
.whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A))
.andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A))
.andEq(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false)
.andEq(TbAppCollectTableVersion.VERSION_A, "2018");
}
@Cacheable(value = "un-scheduled-normal-table", sync = true)
@Retryable(Throwable.class)
public ImmutableList<JobIdAndAlias> unScheduledNormalTable(String version) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
generateUnScheduledNormalTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version)
.build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
)
);
}
@Cacheable(value = "un_scheduled_normal_table_count", sync = true)
@Retryable(Throwable.class)
public Long unScheduledNormalTableCount(String version) {
return mysqlJdbcTemplate.queryForObject(
generateUnScheduledNormalTableSql(SqlBuilder.select(COUNT), version)
.build(),
Long.class
);
}
private SqlBuilder generateUnScheduledFocusTableSql(SelectSqlBuilder builder, String version) {
return builder
.from(TbAppCollectTableInfo._alias_)
.join(TbAppCollectTableVersion._alias_)
.onEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A))
.andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A))
.whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TbAppCollectTableVersion.SCHEDULED_A, false)
.andEq(TbAppCollectTableVersion.VERSION_A, version)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y);
}
@Cacheable(value = "un-scheduled-focus-table", sync = true)
@Retryable(Throwable.class)
public ImmutableList<JobIdAndAlias> unScheduledFocusTable(String version) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
generateUnScheduledFocusTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version)
.build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
)
);
}
@Cacheable(value = "un_scheduled_focus_table_count", sync = true)
@Retryable(Throwable.class)
public Long unScheduledFocusTableCount(String version) {
return mysqlJdbcTemplate.queryForObject(
generateUnScheduledFocusTableSql(SqlBuilder.select(COUNT), version)
.build(),
Long.class
);
}
@Cacheable("un-updated-version-table")
@Retryable(Throwable.class)
public ImmutableList<String> nonUpdatedVersionTables() {
return mysqlTransactionTemplate.execute(status -> {
String listSQL = generateVersionTableIdCriteria(false);
sqlLogger.log(listSQL, "nonUpdatedVersionTables");
List<String> ids = mysqlJdbcTemplate.queryForList(listSQL, String.class);
return Lists.immutable.ofAll(ids);
});
}
@Cacheable("updated-version-table")
@Retryable(Throwable.class)
public ImmutableList<String> updatedVersionTables() {
return mysqlTransactionTemplate.execute(status -> {
String listSQL = generateVersionTableIdCriteria(true);
sqlLogger.log(listSQL, "updatedVersionTables");
List<String> ids = mysqlJdbcTemplate.queryForList(listSQL, String.class);
return Lists.immutable.ofAll(ids);
});
}
private SqlBuilder generateVersionTableCriteria(
SelectSqlBuilder builder,
Integer page,
Integer count,
String version,
Long flinkJobId,
String alias,
String order,
String direction,
ImmutableList<Boolean> filterSchedules,
boolean limited,
boolean groupBy
) {
int limit = Math.max(count, 1);
int offset = limit * Math.max(page - 1, 0);
WhereSqlBuilder root = builder
.from(
TbAppFlinkJobConfig._alias_,
TbAppCollectTableInfo._alias_,
TbAppCollectTableVersion._alias_,
TbAppHudiSyncState._alias_
)
.whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.ALIAS_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.andEq(TbAppCollectTableVersion.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppCollectTableVersion.ALIAS_A, Column.as(TbAppCollectTableInfo.ALIAS_A))
.andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("CONCAT({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
.andLike(ObjectUtil.isNotNull(flinkJobId), TbAppCollectTableInfo.FLINK_JOB_ID_A, flinkJobId)
.andLike(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias)
.andEq(StrUtil.isNotBlank(version), TbAppCollectTableVersion.VERSION_A, version)
.andIn(ObjectUtil.isNotEmpty(filterSchedules), TbAppCollectTableVersion.SCHEDULED_A, filterSchedules);
if (groupBy) {
return root.groupBy(TbAppCollectTableVersion.SCHEDULED_A);
} else {
return root
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction))
.limit(limited, offset, count);
}
}
@Cacheable(value = "version-tables", sync = true)
@Retryable(Throwable.class)
public PageResponse<VersionUpdated> findAllVersionTables(
Integer page,
Integer count,
String version,
Long flinkJobId,
String alias,
String order,
String direction,
ImmutableList<Boolean> filterSchedules
) {
return mysqlTransactionTemplate.execute(status -> {
Long total = mysqlJdbcTemplate.queryForObject(
generateVersionTableCriteria(
SqlBuilder.select(COUNT),
page,
count,
version,
flinkJobId,
alias,
order,
direction,
filterSchedules,
false,
false
).build(),
Long.class
);
List<Pair<Boolean, Integer>> groupMap = mysqlJdbcTemplate.query(
generateVersionTableCriteria(
SqlBuilder.select(TbAppCollectTableVersion.SCHEDULED_A, COUNT),
page,
count,
version,
flinkJobId,
alias,
order,
direction,
filterSchedules,
false,
true
).build(),
(rs, row) -> new Pair<>(rs.getBoolean(1), rs.getInt(2))
);
ImmutableMap<Boolean, Integer> scheduleCount = Lists.immutable.ofAll(groupMap)
.groupBy(Pair::getKey)
.toMap()
.collectValues((key, list) -> list.getOnly().getValue())
.toImmutable();
String listSQL = generateVersionTableCriteria(
SqlBuilder.select(
TbAppCollectTableInfo.FLINK_JOB_ID_A,
TbAppCollectTableInfo.ALIAS_A,
TbAppCollectTableVersion.VERSION_A,
TbAppCollectTableVersion.SCHEDULED_A
),
page,
count,
version,
flinkJobId,
alias,
order,
direction,
filterSchedules,
true,
false
).build();
sqlLogger.log(listSQL, "findAllVersionTables");
List<VersionUpdated> list = mysqlJdbcTemplate.query(
listSQL,
(rs, row) -> new VersionUpdated(
rs.getLong(1),
rs.getString(2),
rs.getString(3),
rs.getBoolean(4)
)
);
return new PageResponse<>(list, total)
.withMetadata("scheduled", scheduleCount.getOrDefault(true, 0))
.withMetadata("unScheduled", scheduleCount.getOrDefault(false, 0));
});
}
private SqlBuilder generateUnReceiveVersionNormalTableSql(SelectSqlBuilder builder, String version) {
return builder
.from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_)
.whereLt(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andNotIn(
StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A),
SqlBuilder.select(StrUtil.format("concat({}, {})", TbAppCollectTableVersion.FLINK_JOB_ID_A, TbAppCollectTableVersion.ALIAS_A))
.from(TbAppCollectTableVersion._alias_)
.whereEq(TbAppCollectTableVersion.VERSION_A, 1)
);
}
@Cacheable(value = "un-receive-version-normal-table", sync = true)
@Retryable(Throwable.class)
public ImmutableList<JobIdAndAlias> unReceiveVersionNormalTable(String version) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
generateUnReceiveVersionNormalTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version)
.build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
)
);
}
@Cacheable(value = "un-receive-version-normal-table-count", sync = true)
@Retryable(Throwable.class)
public Long unReceiveVersionNormalTableCount(String version) {
return mysqlJdbcTemplate.queryForObject(
generateUnReceiveVersionNormalTableSql(SqlBuilder.select(COUNT), version)
.build(),
Long.class
);
}
private SqlBuilder generateUnReceiveVersionFocusTable(SelectSqlBuilder builder, String version) {
return builder
.from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_)
.whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andNotIn(
StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A),
SqlBuilder.select(StrUtil.format("concat({}, {})", TbAppCollectTableVersion.FLINK_JOB_ID_A, TbAppCollectTableVersion.ALIAS_A))
.from(TbAppCollectTableVersion._alias_)
.whereEq(TbAppCollectTableVersion.VERSION_A, version)
);
}
@Cacheable(value = "un-receive-version-focus-table", sync = true)
@Retryable(Throwable.class)
public ImmutableList<JobIdAndAlias> unReceiveVersionFocusTable(String version) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
generateUnReceiveVersionFocusTable(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version)
.build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
)
);
}
@Cacheable(value = "un-receive-version-focus-table-count", sync = true)
@Retryable(Throwable.class)
public Long unReceiveVersionFocusTableCount(String version) {
return mysqlJdbcTemplate.queryForObject(
generateUnReceiveVersionFocusTable(SqlBuilder.select(COUNT), version)
.build(),
Long.class
);
}
}