feature(info-query): 使用统一的表字段变量查询SQL语句

This commit is contained in:
2023-11-30 12:28:49 +08:00
parent 1a0bc8b7ef
commit b69485aaee
2 changed files with 145 additions and 132 deletions

View File

@@ -8,7 +8,6 @@ import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.lang.Pair; import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.SQLConstants;
import com.eshore.odcp.hudi.connector.entity.FlinkJob; import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.eshore.odcp.hudi.connector.entity.TableMeta;
@@ -16,8 +15,6 @@ import com.eshore.odcp.hudi.connector.utils.database.DatabaseService;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.*; import com.lanyuanxiaoyao.service.configuration.entity.info.*;
import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider; import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.List; import java.util.List;
import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Lists;
@@ -28,15 +25,11 @@ import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.Cacheable; import org.springframework.cache.annotation.Cacheable;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.retry.annotation.Retryable; import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig;
/** /**
* @author lanyuanxiaoyao * @author lanyuanxiaoyao
@@ -48,29 +41,8 @@ import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbApp
public class InfoService { public class InfoService {
private static final Logger logger = LoggerFactory.getLogger(InfoService.class); private static final Logger logger = LoggerFactory.getLogger(InfoService.class);
private static final String COUNT = "count(*)"; private static final String COUNT = "count(*)";
private static final Alias TABLE_VERSION = Alias.of(StrUtil.format("{}.tb_app_collect_table_version", DATABASE_NAME), "tactv"); private static final String STATUS_Y = "y";
private static final String TABLE_VERSION_FLINK_JOB_ID = column(TABLE_VERSION, "flink_job_id"); private static final String STATUS_N = "n";
private static final String TABLE_VERSION_ALIAS = column(TABLE_VERSION, "alias");
private static final String TABLE_VERSION_VERSION = column(TABLE_VERSION, "version");
private static final String TABLE_VERSION_SCHEDULED = column(TABLE_VERSION, "scheduled");
private static final Alias TABLE_INFO = Alias.of(StrUtil.format("{}.tb_app_collect_table_info", DATABASE_NAME), "tacti");
private static final String TABLE_INFO_FLINK_JOB_ID = column(TABLE_INFO, "flink_job_id");
private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias");
private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority");
private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status");
private static final String TABLE_INFO_TARGET_HDFS = column(TABLE_INFO, "tgt_hdfs_path");
private static final String TABLE_INFO_TARGET_TABLE_TYPE = column(TABLE_INFO, "tgt_table_type");
private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss");
private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id");
private static final String TABLE_SYNC_STATE_COMPACTION_STATE = column(TABLE_SYNC_STATE, "compaction_status");
private static final Alias TABLE_COMPACTION_METRICS = Alias.of(StrUtil.format("{}.tb_app_hudi_compaction_metrics", DATABASE_NAME), "tahcm");
private static final String TABLE_COMPACTION_METRICS_TYPE = column(TABLE_COMPACTION_METRICS, "type");
private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id");
private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias");
private static final Alias TABLE_FLINK_JOB = Alias.of(StrUtil.format("{}.tb_app_flink_job_config", DATABASE_NAME), "tafjc");
private static final String TABLE_FLINK_JOB_ID = column(TABLE_FLINK_JOB, "id");
private static final String TABLE_FLINK_JOB_STATUS = column(TABLE_FLINK_JOB, "status");
private static final String TABLE_FLINK_JOB_RUN_MODE = column(TABLE_FLINK_JOB, "run_mode");
private final DatabaseService databaseService; private final DatabaseService databaseService;
private final JdbcTemplate mysqlJdbcTemplate; private final JdbcTemplate mysqlJdbcTemplate;
private final TransactionTemplate mysqlTransactionTemplate; private final TransactionTemplate mysqlTransactionTemplate;
@@ -84,10 +56,18 @@ public class InfoService {
} }
private static String generateVersionTableIdCriteria(Boolean scheduled) { private static String generateVersionTableIdCriteria(Boolean scheduled) {
return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))
.from(TABLE_VERSION) .from(
.whereEq(TABLE_VERSION_SCHEDULED, scheduled) TbAppCollectTableVersion._alias_,
.andEq(TABLE_VERSION_VERSION, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')")) TbAppFlinkJobConfig._alias_,
TbAppCollectTableInfo._alias_
)
.whereEq(TbAppCollectTableVersion.SCHEDULED_A, scheduled)
.andEq(TbAppCollectTableVersion.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppCollectTableVersion.ALIAS_A, Column.as(TbAppCollectTableInfo.ALIAS_A))
.andEq(TbAppCollectTableVersion.VERSION_A, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')"))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(); .build();
} }
@@ -116,14 +96,20 @@ public class InfoService {
) { ) {
int limit = Math.max(count, 1); int limit = Math.max(count, 1);
int offset = limit * Math.max(page - 1, 0); int offset = limit * Math.max(page - 1, 0);
return builder.from(TABLE_FLINK_JOB, TABLE_INFO, TABLE_SYNC_STATE) return builder.from(
.whereEq(TABLE_FLINK_JOB_ID, Column.as(TABLE_INFO_FLINK_JOB_ID)) TbAppFlinkJobConfig._alias_,
.andEq(TABLE_SYNC_STATE_ID, Column.as(StrUtil.format("concat({}, '-', {})", TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS))) TbAppCollectTableInfo._alias_,
.andLike(ObjectUtil.isNotNull(flinkJobId), TABLE_FLINK_JOB_ID, flinkJobId) TbAppHudiSyncState._alias_
.andLike(ObjectUtil.isNotNull(alias), TABLE_INFO_ALIAS, alias) )
.andIn(ObjectUtil.isNotEmpty(selectHudiTableType), TABLE_INFO_TARGET_TABLE_TYPE, selectHudiTableType) .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andIn(ObjectUtil.isNotEmpty(selectedRunMode), TABLE_FLINK_JOB_RUN_MODE, selectedRunMode) .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
.andIn(ObjectUtil.isNotEmpty(selectedCompactionStatus), TABLE_SYNC_STATE_COMPACTION_STATE, selectedCompactionStatus) .andLike(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
.andLike(ObjectUtil.isNotNull(alias), TbAppCollectTableInfo.ALIAS_A, alias)
.andIn(ObjectUtil.isNotEmpty(selectHudiTableType), TbAppCollectTableInfo.TGT_TABLE_A, selectHudiTableType)
.andIn(ObjectUtil.isNotEmpty(selectedRunMode), TbAppFlinkJobConfig.RUN_MODE_A, selectedRunMode)
.andIn(ObjectUtil.isNotEmpty(selectedCompactionStatus), TbAppHudiSyncState.COMPACTION_STATUS_A, selectedCompactionStatus)
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction))
.limit(limited, offset, limit); .limit(limited, offset, limit);
} }
@@ -203,7 +189,7 @@ public class InfoService {
Long.class Long.class
); );
String listSQL = generateJobIdAndAliasCriteria( String listSQL = generateJobIdAndAliasCriteria(
SqlBuilder.select(TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS), SqlBuilder.select(TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A),
page, page,
count, count,
flinkJobId, flinkJobId,
@@ -294,19 +280,25 @@ public class InfoService {
) { ) {
int limit = Math.max(count, 1); int limit = Math.max(count, 1);
int offset = limit * Math.max(page - 1, 0); int offset = limit * Math.max(page - 1, 0);
WhereSqlBuilder root = builder.from(TABLE_INFO) WhereSqlBuilder root = builder
.join(TABLE_VERSION) .from(
.onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) TbAppFlinkJobConfig._alias_,
.andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) TbAppCollectTableInfo._alias_,
.andEq(TABLE_INFO_STATUS, "y") TbAppCollectTableVersion._alias_,
.join(TABLE_SYNC_STATE) TbAppHudiSyncState._alias_
.on(StrUtil.format("{} = CONCAT({}, '-', {})", TABLE_SYNC_STATE_ID, TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS)) )
.whereLike(ObjectUtil.isNotNull(flinkJobId), TABLE_INFO_FLINK_JOB_ID, flinkJobId) .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.ALIAS_A))
.andLike(StrUtil.isNotBlank(alias), TABLE_INFO_ALIAS, alias) .andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(StrUtil.isNotBlank(version), TABLE_VERSION_VERSION, version) .andEq(TbAppCollectTableInfo.STATUS_A, "y")
.andIn(ObjectUtil.isNotEmpty(filterSchedules), TABLE_VERSION_SCHEDULED, filterSchedules); .andEq(TbAppCollectTableVersion.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppCollectTableVersion.ALIAS_A, Column.as(TbAppCollectTableInfo.ALIAS_A))
.andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("CONCAT({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
.andLike(ObjectUtil.isNotNull(flinkJobId), TbAppCollectTableInfo.FLINK_JOB_ID_A, flinkJobId)
.andLike(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias)
.andEq(StrUtil.isNotBlank(version), TbAppCollectTableVersion.VERSION_A, version)
.andIn(ObjectUtil.isNotEmpty(filterSchedules), TbAppCollectTableVersion.SCHEDULED_A, filterSchedules);
if (groupBy) { if (groupBy) {
return root.groupBy(TABLE_VERSION_SCHEDULED); return root.groupBy(TbAppCollectTableVersion.SCHEDULED_A);
} else { } else {
return root return root
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction))
@@ -345,7 +337,7 @@ public class InfoService {
); );
List<Pair<Boolean, Integer>> groupMap = mysqlJdbcTemplate.query( List<Pair<Boolean, Integer>> groupMap = mysqlJdbcTemplate.query(
generateVersionTableCriteria( generateVersionTableCriteria(
SqlBuilder.select(TABLE_VERSION_SCHEDULED, COUNT), SqlBuilder.select(TbAppCollectTableVersion.SCHEDULED_A, COUNT),
page, page,
count, count,
version, version,
@@ -366,10 +358,10 @@ public class InfoService {
.toImmutable(); .toImmutable();
String listSQL = generateVersionTableCriteria( String listSQL = generateVersionTableCriteria(
SqlBuilder.select( SqlBuilder.select(
TABLE_INFO_FLINK_JOB_ID, TbAppCollectTableInfo.FLINK_JOB_ID_A,
TABLE_INFO_ALIAS, TbAppCollectTableInfo.ALIAS_A,
TABLE_VERSION_VERSION, TbAppCollectTableVersion.VERSION_A,
TABLE_VERSION_SCHEDULED TbAppCollectTableVersion.SCHEDULED_A
), ),
page, page,
count, count,
@@ -400,14 +392,16 @@ public class InfoService {
private SqlBuilder generateUnReceiveVersionNormalTableSql(SelectSqlBuilder builder, String version) { private SqlBuilder generateUnReceiveVersionNormalTableSql(SelectSqlBuilder builder, String version) {
return builder return builder
.from(TABLE_INFO) .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_)
.whereLt(TABLE_INFO_PRIORITY, 10000) .whereLt(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TABLE_INFO_STATUS, "y") .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andNotIn( .andNotIn(
StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), StrUtil.format("concat({}, {})", TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A),
SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) SqlBuilder.select(StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableVersion.ALIAS_A))
.from(TABLE_VERSION) .from(TbAppCollectTableVersion._alias_)
.whereEq(TABLE_VERSION_VERSION, version) .whereEq(TbAppCollectTableVersion.VERSION_A, 1)
); );
} }
@@ -416,7 +410,7 @@ public class InfoService {
public ImmutableList<JobIdAndAlias> unReceiveVersionNormalTable(String version) { public ImmutableList<JobIdAndAlias> unReceiveVersionNormalTable(String version) {
return Lists.immutable.ofAll( return Lists.immutable.ofAll(
mysqlJdbcTemplate.query( mysqlJdbcTemplate.query(
generateUnReceiveVersionNormalTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) generateUnReceiveVersionNormalTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version)
.build(), .build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
) )
@@ -435,14 +429,16 @@ public class InfoService {
private SqlBuilder generateUnReceiveVersionFocusTable(SelectSqlBuilder builder, String version) { private SqlBuilder generateUnReceiveVersionFocusTable(SelectSqlBuilder builder, String version) {
return builder return builder
.from(TABLE_INFO) .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_)
.whereGe(TABLE_INFO_PRIORITY, 10000) .whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TABLE_INFO_STATUS, "y") .andEq(TbAppCollectTableInfo.STATUS_A, "y")
.andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andNotIn( .andNotIn(
StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), StrUtil.format("concat({}, {})", TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A),
SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) SqlBuilder.select(StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableVersion.ALIAS_A))
.from(TABLE_VERSION) .from(TbAppCollectTableVersion._alias_)
.whereEq(TABLE_VERSION_VERSION, version) .whereEq(TbAppCollectTableVersion.VERSION_A, version)
); );
} }
@@ -451,7 +447,7 @@ public class InfoService {
public ImmutableList<JobIdAndAlias> unReceiveVersionFocusTable(String version) { public ImmutableList<JobIdAndAlias> unReceiveVersionFocusTable(String version) {
return Lists.immutable.ofAll( return Lists.immutable.ofAll(
mysqlJdbcTemplate.query( mysqlJdbcTemplate.query(
generateUnReceiveVersionFocusTable(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) generateUnReceiveVersionFocusTable(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version)
.build(), .build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
) )
@@ -470,14 +466,15 @@ public class InfoService {
private SqlBuilder generateUnScheduledNormalTableSql(SelectSqlBuilder builder, String version) { private SqlBuilder generateUnScheduledNormalTableSql(SelectSqlBuilder builder, String version) {
return builder return builder
.from(TABLE_INFO) .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_, TbAppCollectTableVersion._alias_)
.join(TABLE_VERSION) .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) .andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) .andEq(TbAppCollectTableInfo.STATUS_A, "y")
.whereLt(TABLE_INFO_PRIORITY, 10000) .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A))
.andEq(TABLE_VERSION_SCHEDULED, false) .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A))
.andEq(TABLE_VERSION_VERSION, version) .andEq(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TABLE_INFO_STATUS, "y"); .andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false)
.andEq(TbAppCollectTableVersion.VERSION_A, "2018");
} }
@Cacheable(value = "un-scheduled-normal-table", sync = true) @Cacheable(value = "un-scheduled-normal-table", sync = true)
@@ -485,7 +482,7 @@ public class InfoService {
public ImmutableList<JobIdAndAlias> unScheduledNormalTable(String version) { public ImmutableList<JobIdAndAlias> unScheduledNormalTable(String version) {
return Lists.immutable.ofAll( return Lists.immutable.ofAll(
mysqlJdbcTemplate.query( mysqlJdbcTemplate.query(
generateUnScheduledNormalTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) generateUnScheduledNormalTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version)
.build(), .build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
) )
@@ -504,14 +501,14 @@ public class InfoService {
private SqlBuilder generateUnScheduledFocusTableSql(SelectSqlBuilder builder, String version) { private SqlBuilder generateUnScheduledFocusTableSql(SelectSqlBuilder builder, String version) {
return builder return builder
.from(TABLE_INFO) .from(TbAppCollectTableInfo._alias_)
.join(TABLE_VERSION) .join(TbAppCollectTableVersion._alias_)
.onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) .onEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A))
.andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A))
.whereGe(TABLE_INFO_PRIORITY, 10000) .whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TABLE_VERSION_SCHEDULED, false) .andEq(TbAppCollectTableVersion.SCHEDULED_A, false)
.andEq(TABLE_VERSION_VERSION, version) .andEq(TbAppCollectTableVersion.VERSION_A, version)
.andEq(TABLE_INFO_STATUS, "y"); .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y);
} }
@Cacheable(value = "un-scheduled-focus-table", sync = true) @Cacheable(value = "un-scheduled-focus-table", sync = true)
@@ -519,7 +516,7 @@ public class InfoService {
public ImmutableList<JobIdAndAlias> unScheduledFocusTable(String version) { public ImmutableList<JobIdAndAlias> unScheduledFocusTable(String version) {
return Lists.immutable.ofAll( return Lists.immutable.ofAll(
mysqlJdbcTemplate.query( mysqlJdbcTemplate.query(
generateUnScheduledFocusTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) generateUnScheduledFocusTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version)
.build(), .build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
) )
@@ -540,9 +537,9 @@ public class InfoService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long tableCount() { public Long tableCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct concat(src_schema, src_table))") SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A))
.from(TABLE_INFO) .from(TbAppCollectTableInfo._alias_)
.whereEq(TABLE_INFO_STATUS, "y") .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(), .build(),
Long.class Long.class
); );
@@ -552,10 +549,10 @@ public class InfoService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long tableFocusCount() { public Long tableFocusCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct concat(src_schema, src_table))") SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A))
.from(TABLE_INFO) .from(TbAppCollectTableInfo._alias_)
.whereEq(TABLE_INFO_STATUS, "y") .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andGe(TABLE_INFO_PRIORITY, 10000) .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.build(), .build(),
Long.class Long.class
); );
@@ -565,9 +562,9 @@ public class InfoService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long hudiCount() { public Long hudiCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct tgt_hdfs_path) as count") SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A))
.from(TABLE_INFO) .from(TbAppCollectTableInfo._alias_)
.whereEq(TABLE_INFO_STATUS, "y") .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(), .build(),
Long.class Long.class
); );
@@ -577,10 +574,10 @@ public class InfoService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long hudiFocusCount() { public Long hudiFocusCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct tgt_hdfs_path) as count") SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A))
.from(TABLE_INFO) .from(TbAppCollectTableInfo._alias_)
.whereEq(TABLE_INFO_STATUS, "y") .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andGe(TABLE_INFO_PRIORITY, 10000) .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.build(), .build(),
Long.class Long.class
); );
@@ -590,9 +587,9 @@ public class InfoService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long hiveCount() { public Long hiveCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct concat(hive_db, hive_table)) as count") SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A))
.from(TABLE_INFO) .from(TbAppCollectTableInfo._alias_)
.whereEq(TABLE_INFO_STATUS, "y") .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(), .build(),
Long.class Long.class
); );
@@ -602,10 +599,10 @@ public class InfoService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long hiveFocusCount() { public Long hiveFocusCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct concat(hive_db, hive_table)) as count") SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A))
.from(TABLE_INFO) .from(TbAppCollectTableInfo._alias_)
.whereEq(TABLE_INFO_STATUS, "y") .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andGe(TABLE_INFO_PRIORITY, 10000) .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.build(), .build(),
Long.class Long.class
); );

View File

@@ -1,15 +1,12 @@
package com.test; package com.test;
import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.config.GlobalConfig;
import club.kingon.sql.builder.entry.Alias;
import club.kingon.sql.builder.entry.Column; import club.kingon.sql.builder.entry.Column;
import club.kingon.sql.builder.function.Functions; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.db.sql.SqlFormatter; import cn.hutool.db.sql.SqlUtil;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*;
import static com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.*;
/** /**
* @author lanyuanxiaoyao * @author lanyuanxiaoyao
@@ -17,9 +14,8 @@ import static com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.*;
*/ */
public class SqlBuilderTests { public class SqlBuilderTests {
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(SqlFormatter.format( /*System.out.println(SqlFormatter.format(
SqlBuilder.select( SqlBuilder.select(
Functions.count(Column.as(DataSource.DS_NAME_A)),
DataSource.DS_NAME_A, DataSource.DS_NAME_A,
DataSource.SCHEMA_NAME_A, DataSource.SCHEMA_NAME_A,
DataSourceTable.TABLE_NAME_A, DataSourceTable.TABLE_NAME_A,
@@ -52,10 +48,10 @@ public class SqlBuilderTests {
TbAppCollectTableInfo.FILTER_TYPE_A, TbAppCollectTableInfo.FILTER_TYPE_A,
TbAppCollectTableInfo.SRC_TOPIC_A, TbAppCollectTableInfo.SRC_TOPIC_A,
TbAppCollectTableInfo.SRC_PULSAR_ADDR_A, TbAppCollectTableInfo.SRC_PULSAR_ADDR_A,
Alias.of(TbAppYarnJobConfigSync.JOB_MANAGER_MEMORY_A, "sync_job_manager_memory"), Alias.of("sync_config." + TbAppYarnJobConfig.JOB_MANAGER_MEMORY_O, "sync_job_manager_memory"),
Alias.of(TbAppYarnJobConfigSync.TASK_MANAGER_MEMORY_A, "sync_task_manager_memory"), Alias.of("sync_config." + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_O, "sync_task_manager_memory"),
Alias.of(TbAppYarnJobConfigCompaction.JOB_MANAGER_MEMORY_A, "compaction_job_manager_memory"), Alias.of("compaction_config." + TbAppYarnJobConfig.JOB_MANAGER_MEMORY_O, "compaction_job_manager_memory"),
Alias.of(TbAppYarnJobConfigCompaction.TASK_MANAGER_MEMORY_A, "compaction_task_manger_momory"), Alias.of("compaction_config." + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_O, "compaction_task_manger_momory"),
TbAppCollectTableInfo.PARTITION_FIELD_A, TbAppCollectTableInfo.PARTITION_FIELD_A,
TbAppHudiSyncState.MESSAGE_ID_A, TbAppHudiSyncState.MESSAGE_ID_A,
TbAppGlobalConfig.METRIC_PUBLISH_URL_A, TbAppGlobalConfig.METRIC_PUBLISH_URL_A,
@@ -84,8 +80,8 @@ public class SqlBuilderTests {
DataSourceTableField._alias_, DataSourceTableField._alias_,
TbAppFlinkJobConfig._alias_, TbAppFlinkJobConfig._alias_,
TbAppHudiJobConfig._alias_, TbAppHudiJobConfig._alias_,
TbAppYarnJobConfigSync._alias_, Alias.of(TbAppYarnJobConfig._origin_, "sync_config"),
TbAppYarnJobConfigCompaction._alias_, Alias.of(TbAppYarnJobConfig._origin_, "compaction_config"),
TbAppGlobalConfig._alias_, TbAppGlobalConfig._alias_,
TbAppCollectTableInfo._alias_ TbAppCollectTableInfo._alias_
) )
@@ -104,18 +100,38 @@ public class SqlBuilderTests {
.andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A)) .andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A))
.andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) .andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A)) .andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A))
.andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfigSync.ID_A)) .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("sync_config." + TbAppYarnJobConfig.ID_O))
.andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfigCompaction.ID_A)) .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("compaction_config." + TbAppYarnJobConfig.ID_O))
.andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A)) .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A))
.andEq(TbAppFlinkJobConfig.ID_A, 1542097984132706304L) .andEq(TbAppFlinkJobConfig.ID_A, 1542097984132706304L)
.andEq(TbAppCollectTableInfo.ALIAS_A, "crm_cfguse_channel") .andEq(TbAppCollectTableInfo.ALIAS_A, "crm_cfguse_channel")
.andEq(TbAppCollectTableInfo.STATUS_A, "y") .andEq(TbAppCollectTableInfo.STATUS_A, "y")
.andEq(TbAppFlinkJobConfig.STATUS_A, "y") .andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TbAppHudiJobConfig.STATUS_A, "y") .andEq(TbAppHudiJobConfig.STATUS_A, "y")
.andEq(TbAppYarnJobConfigSync.STATUS_A, "y") .andEq("sync_config." + TbAppYarnJobConfig.STATUS_O, "y")
.andEq(TbAppYarnJobConfigCompaction.STATUS_A, "y") .andEq("compaction_config." + TbAppYarnJobConfig.STATUS_O, "y")
.orderBy(DataSourceTableField.FIELD_SEQ_A) .orderBy(DataSourceTableField.FIELD_SEQ_A)
.build() .build()
));*/
System.out.println(SqlUtil.formatSql(
SqlBuilder.select("*")
.from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_, TbAppCollectTableVersion._alias_)
.whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A))
.andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A))
.andEq(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false)
.andEq(TbAppCollectTableVersion.VERSION_A, "2018")
/*.join(TbAppCollectTableVersion._alias_)
.onEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A))
.andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A))
.whereLt(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false)
.andEq(TbAppCollectTableVersion.VERSION_A, "2018")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")*/
.build()
)); ));
} }
} }