diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index cfefa24..fc7fec3 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -8,7 +8,6 @@ import club.kingon.sql.builder.entry.Column; import cn.hutool.core.lang.Pair; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; -import com.eshore.odcp.hudi.connector.SQLConstants; import com.eshore.odcp.hudi.connector.entity.FlinkJob; import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; @@ -16,8 +15,6 @@ import com.eshore.odcp.hudi.connector.utils.database.DatabaseService; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.*; import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider; -import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Timestamp; import java.util.List; import org.eclipse.collections.api.factory.Lists; @@ -28,15 +25,11 @@ import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.Cacheable; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.RowMapper; import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Service; import org.springframework.transaction.support.TransactionTemplate; -import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME; import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; -import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo; -import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig; /** * @author lanyuanxiaoyao @@ -48,29 +41,8 @@ import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbApp public class InfoService { private static final Logger logger = LoggerFactory.getLogger(InfoService.class); private static final String COUNT = "count(*)"; - private static final Alias TABLE_VERSION = Alias.of(StrUtil.format("{}.tb_app_collect_table_version", DATABASE_NAME), "tactv"); - private static final String TABLE_VERSION_FLINK_JOB_ID = column(TABLE_VERSION, "flink_job_id"); - private static final String TABLE_VERSION_ALIAS = column(TABLE_VERSION, "alias"); - private static final String TABLE_VERSION_VERSION = column(TABLE_VERSION, "version"); - private static final String TABLE_VERSION_SCHEDULED = column(TABLE_VERSION, "scheduled"); - private static final Alias TABLE_INFO = Alias.of(StrUtil.format("{}.tb_app_collect_table_info", DATABASE_NAME), "tacti"); - private static final String TABLE_INFO_FLINK_JOB_ID = column(TABLE_INFO, "flink_job_id"); - private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias"); - private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority"); - private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status"); - private static final String TABLE_INFO_TARGET_HDFS = column(TABLE_INFO, "tgt_hdfs_path"); - private static final String TABLE_INFO_TARGET_TABLE_TYPE = column(TABLE_INFO, "tgt_table_type"); - private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss"); - private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id"); - private static final String TABLE_SYNC_STATE_COMPACTION_STATE = column(TABLE_SYNC_STATE, "compaction_status"); - private static final Alias TABLE_COMPACTION_METRICS = Alias.of(StrUtil.format("{}.tb_app_hudi_compaction_metrics", DATABASE_NAME), "tahcm"); - private static final String TABLE_COMPACTION_METRICS_TYPE = column(TABLE_COMPACTION_METRICS, "type"); - private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id"); - private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias"); - private static final Alias TABLE_FLINK_JOB = Alias.of(StrUtil.format("{}.tb_app_flink_job_config", DATABASE_NAME), "tafjc"); - private static final String TABLE_FLINK_JOB_ID = column(TABLE_FLINK_JOB, "id"); - private static final String TABLE_FLINK_JOB_STATUS = column(TABLE_FLINK_JOB, "status"); - private static final String TABLE_FLINK_JOB_RUN_MODE = column(TABLE_FLINK_JOB, "run_mode"); + private static final String STATUS_Y = "y"; + private static final String STATUS_N = "n"; private final DatabaseService databaseService; private final JdbcTemplate mysqlJdbcTemplate; private final TransactionTemplate mysqlTransactionTemplate; @@ -84,10 +56,18 @@ public class InfoService { } private static String generateVersionTableIdCriteria(Boolean scheduled) { - return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) - .from(TABLE_VERSION) - .whereEq(TABLE_VERSION_SCHEDULED, scheduled) - .andEq(TABLE_VERSION_VERSION, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')")) + return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)) + .from( + TbAppCollectTableVersion._alias_, + TbAppFlinkJobConfig._alias_, + TbAppCollectTableInfo._alias_ + ) + .whereEq(TbAppCollectTableVersion.SCHEDULED_A, scheduled) + .andEq(TbAppCollectTableVersion.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) + .andEq(TbAppCollectTableVersion.ALIAS_A, Column.as(TbAppCollectTableInfo.ALIAS_A)) + .andEq(TbAppCollectTableVersion.VERSION_A, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')")) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) .build(); } @@ -116,14 +96,20 @@ public class InfoService { ) { int limit = Math.max(count, 1); int offset = limit * Math.max(page - 1, 0); - return builder.from(TABLE_FLINK_JOB, TABLE_INFO, TABLE_SYNC_STATE) - .whereEq(TABLE_FLINK_JOB_ID, Column.as(TABLE_INFO_FLINK_JOB_ID)) - .andEq(TABLE_SYNC_STATE_ID, Column.as(StrUtil.format("concat({}, '-', {})", TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS))) - .andLike(ObjectUtil.isNotNull(flinkJobId), TABLE_FLINK_JOB_ID, flinkJobId) - .andLike(ObjectUtil.isNotNull(alias), TABLE_INFO_ALIAS, alias) - .andIn(ObjectUtil.isNotEmpty(selectHudiTableType), TABLE_INFO_TARGET_TABLE_TYPE, selectHudiTableType) - .andIn(ObjectUtil.isNotEmpty(selectedRunMode), TABLE_FLINK_JOB_RUN_MODE, selectedRunMode) - .andIn(ObjectUtil.isNotEmpty(selectedCompactionStatus), TABLE_SYNC_STATE_COMPACTION_STATE, selectedCompactionStatus) + return builder.from( + TbAppFlinkJobConfig._alias_, + TbAppCollectTableInfo._alias_, + TbAppHudiSyncState._alias_ + ) + .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) + .andLike(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) + .andLike(ObjectUtil.isNotNull(alias), TbAppCollectTableInfo.ALIAS_A, alias) + .andIn(ObjectUtil.isNotEmpty(selectHudiTableType), TbAppCollectTableInfo.TGT_TABLE_A, selectHudiTableType) + .andIn(ObjectUtil.isNotEmpty(selectedRunMode), TbAppFlinkJobConfig.RUN_MODE_A, selectedRunMode) + .andIn(ObjectUtil.isNotEmpty(selectedCompactionStatus), TbAppHudiSyncState.COMPACTION_STATUS_A, selectedCompactionStatus) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) .limit(limited, offset, limit); } @@ -203,7 +189,7 @@ public class InfoService { Long.class ); String listSQL = generateJobIdAndAliasCriteria( - SqlBuilder.select(TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS), + SqlBuilder.select(TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A), page, count, flinkJobId, @@ -294,19 +280,25 @@ public class InfoService { ) { int limit = Math.max(count, 1); int offset = limit * Math.max(page - 1, 0); - WhereSqlBuilder root = builder.from(TABLE_INFO) - .join(TABLE_VERSION) - .onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) - .andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) - .andEq(TABLE_INFO_STATUS, "y") - .join(TABLE_SYNC_STATE) - .on(StrUtil.format("{} = CONCAT({}, '-', {})", TABLE_SYNC_STATE_ID, TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS)) - .whereLike(ObjectUtil.isNotNull(flinkJobId), TABLE_INFO_FLINK_JOB_ID, flinkJobId) - .andLike(StrUtil.isNotBlank(alias), TABLE_INFO_ALIAS, alias) - .andEq(StrUtil.isNotBlank(version), TABLE_VERSION_VERSION, version) - .andIn(ObjectUtil.isNotEmpty(filterSchedules), TABLE_VERSION_SCHEDULED, filterSchedules); + WhereSqlBuilder root = builder + .from( + TbAppFlinkJobConfig._alias_, + TbAppCollectTableInfo._alias_, + TbAppCollectTableVersion._alias_, + TbAppHudiSyncState._alias_ + ) + .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.ALIAS_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, "y") + .andEq(TbAppCollectTableInfo.STATUS_A, "y") + .andEq(TbAppCollectTableVersion.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) + .andEq(TbAppCollectTableVersion.ALIAS_A, Column.as(TbAppCollectTableInfo.ALIAS_A)) + .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("CONCAT({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) + .andLike(ObjectUtil.isNotNull(flinkJobId), TbAppCollectTableInfo.FLINK_JOB_ID_A, flinkJobId) + .andLike(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) + .andEq(StrUtil.isNotBlank(version), TbAppCollectTableVersion.VERSION_A, version) + .andIn(ObjectUtil.isNotEmpty(filterSchedules), TbAppCollectTableVersion.SCHEDULED_A, filterSchedules); if (groupBy) { - return root.groupBy(TABLE_VERSION_SCHEDULED); + return root.groupBy(TbAppCollectTableVersion.SCHEDULED_A); } else { return root .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) @@ -345,7 +337,7 @@ public class InfoService { ); List> groupMap = mysqlJdbcTemplate.query( generateVersionTableCriteria( - SqlBuilder.select(TABLE_VERSION_SCHEDULED, COUNT), + SqlBuilder.select(TbAppCollectTableVersion.SCHEDULED_A, COUNT), page, count, version, @@ -366,10 +358,10 @@ public class InfoService { .toImmutable(); String listSQL = generateVersionTableCriteria( SqlBuilder.select( - TABLE_INFO_FLINK_JOB_ID, - TABLE_INFO_ALIAS, - TABLE_VERSION_VERSION, - TABLE_VERSION_SCHEDULED + TbAppCollectTableInfo.FLINK_JOB_ID_A, + TbAppCollectTableInfo.ALIAS_A, + TbAppCollectTableVersion.VERSION_A, + TbAppCollectTableVersion.SCHEDULED_A ), page, count, @@ -400,14 +392,16 @@ public class InfoService { private SqlBuilder generateUnReceiveVersionNormalTableSql(SelectSqlBuilder builder, String version) { return builder - .from(TABLE_INFO) - .whereLt(TABLE_INFO_PRIORITY, 10000) - .andEq(TABLE_INFO_STATUS, "y") + .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) + .whereLt(TbAppCollectTableInfo.PRIORITY_A, 10000) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) .andNotIn( - StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), - SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) - .from(TABLE_VERSION) - .whereEq(TABLE_VERSION_VERSION, version) + StrUtil.format("concat({}, {})", TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), + SqlBuilder.select(StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableVersion.ALIAS_A)) + .from(TbAppCollectTableVersion._alias_) + .whereEq(TbAppCollectTableVersion.VERSION_A, 1) ); } @@ -416,7 +410,7 @@ public class InfoService { public ImmutableList unReceiveVersionNormalTable(String version) { return Lists.immutable.ofAll( mysqlJdbcTemplate.query( - generateUnReceiveVersionNormalTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) + generateUnReceiveVersionNormalTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) .build(), (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) ) @@ -435,14 +429,16 @@ public class InfoService { private SqlBuilder generateUnReceiveVersionFocusTable(SelectSqlBuilder builder, String version) { return builder - .from(TABLE_INFO) - .whereGe(TABLE_INFO_PRIORITY, 10000) - .andEq(TABLE_INFO_STATUS, "y") + .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) + .whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000) + .andEq(TbAppCollectTableInfo.STATUS_A, "y") + .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) .andNotIn( - StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), - SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) - .from(TABLE_VERSION) - .whereEq(TABLE_VERSION_VERSION, version) + StrUtil.format("concat({}, {})", TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), + SqlBuilder.select(StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableVersion.ALIAS_A)) + .from(TbAppCollectTableVersion._alias_) + .whereEq(TbAppCollectTableVersion.VERSION_A, version) ); } @@ -451,7 +447,7 @@ public class InfoService { public ImmutableList unReceiveVersionFocusTable(String version) { return Lists.immutable.ofAll( mysqlJdbcTemplate.query( - generateUnReceiveVersionFocusTable(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) + generateUnReceiveVersionFocusTable(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) .build(), (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) ) @@ -470,14 +466,15 @@ public class InfoService { private SqlBuilder generateUnScheduledNormalTableSql(SelectSqlBuilder builder, String version) { return builder - .from(TABLE_INFO) - .join(TABLE_VERSION) - .onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) - .andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) - .whereLt(TABLE_INFO_PRIORITY, 10000) - .andEq(TABLE_VERSION_SCHEDULED, false) - .andEq(TABLE_VERSION_VERSION, version) - .andEq(TABLE_INFO_STATUS, "y"); + .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_, TbAppCollectTableVersion._alias_) + .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, "y") + .andEq(TbAppCollectTableInfo.STATUS_A, "y") + .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) + .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) + .andEq(TbAppCollectTableInfo.PRIORITY_A, 10000) + .andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false) + .andEq(TbAppCollectTableVersion.VERSION_A, "2018"); } @Cacheable(value = "un-scheduled-normal-table", sync = true) @@ -485,7 +482,7 @@ public class InfoService { public ImmutableList unScheduledNormalTable(String version) { return Lists.immutable.ofAll( mysqlJdbcTemplate.query( - generateUnScheduledNormalTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) + generateUnScheduledNormalTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) .build(), (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) ) @@ -504,14 +501,14 @@ public class InfoService { private SqlBuilder generateUnScheduledFocusTableSql(SelectSqlBuilder builder, String version) { return builder - .from(TABLE_INFO) - .join(TABLE_VERSION) - .onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) - .andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) - .whereGe(TABLE_INFO_PRIORITY, 10000) - .andEq(TABLE_VERSION_SCHEDULED, false) - .andEq(TABLE_VERSION_VERSION, version) - .andEq(TABLE_INFO_STATUS, "y"); + .from(TbAppCollectTableInfo._alias_) + .join(TbAppCollectTableVersion._alias_) + .onEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) + .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) + .whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000) + .andEq(TbAppCollectTableVersion.SCHEDULED_A, false) + .andEq(TbAppCollectTableVersion.VERSION_A, version) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y); } @Cacheable(value = "un-scheduled-focus-table", sync = true) @@ -519,7 +516,7 @@ public class InfoService { public ImmutableList unScheduledFocusTable(String version) { return Lists.immutable.ofAll( mysqlJdbcTemplate.query( - generateUnScheduledFocusTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version) + generateUnScheduledFocusTableSql(SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A), version) .build(), (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) ) @@ -540,9 +537,9 @@ public class InfoService { @Retryable(Throwable.class) public Long tableCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select("count(distinct concat(src_schema, src_table))") - .from(TABLE_INFO) - .whereEq(TABLE_INFO_STATUS, "y") + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) .build(), Long.class ); @@ -552,10 +549,10 @@ public class InfoService { @Retryable(Throwable.class) public Long tableFocusCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select("count(distinct concat(src_schema, src_table))") - .from(TABLE_INFO) - .whereEq(TABLE_INFO_STATUS, "y") - .andGe(TABLE_INFO_PRIORITY, 10000) + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) .build(), Long.class ); @@ -565,9 +562,9 @@ public class InfoService { @Retryable(Throwable.class) public Long hudiCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select("count(distinct tgt_hdfs_path) as count") - .from(TABLE_INFO) - .whereEq(TABLE_INFO_STATUS, "y") + SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) .build(), Long.class ); @@ -577,10 +574,10 @@ public class InfoService { @Retryable(Throwable.class) public Long hudiFocusCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select("count(distinct tgt_hdfs_path) as count") - .from(TABLE_INFO) - .whereEq(TABLE_INFO_STATUS, "y") - .andGe(TABLE_INFO_PRIORITY, 10000) + SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) .build(), Long.class ); @@ -590,9 +587,9 @@ public class InfoService { @Retryable(Throwable.class) public Long hiveCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select("count(distinct concat(hive_db, hive_table)) as count") - .from(TABLE_INFO) - .whereEq(TABLE_INFO_STATUS, "y") + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) .build(), Long.class ); @@ -602,10 +599,10 @@ public class InfoService { @Retryable(Throwable.class) public Long hiveFocusCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select("count(distinct concat(hive_db, hive_table)) as count") - .from(TABLE_INFO) - .whereEq(TABLE_INFO_STATUS, "y") - .andGe(TABLE_INFO_PRIORITY, 10000) + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)) + .from(TbAppCollectTableInfo._alias_) + .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) .build(), Long.class ); diff --git a/service-info-query/src/test/java/com/test/SqlBuilderTests.java b/service-info-query/src/test/java/com/test/SqlBuilderTests.java index 117c7d6..8b69f2a 100644 --- a/service-info-query/src/test/java/com/test/SqlBuilderTests.java +++ b/service-info-query/src/test/java/com/test/SqlBuilderTests.java @@ -1,15 +1,12 @@ package com.test; import club.kingon.sql.builder.SqlBuilder; -import club.kingon.sql.builder.config.GlobalConfig; -import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Column; -import club.kingon.sql.builder.function.Functions; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; -import cn.hutool.db.sql.SqlFormatter; +import cn.hutool.db.sql.SqlUtil; import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; -import static com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.*; /** * @author lanyuanxiaoyao @@ -17,9 +14,8 @@ import static com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.*; */ public class SqlBuilderTests { public static void main(String[] args) { - System.out.println(SqlFormatter.format( + /*System.out.println(SqlFormatter.format( SqlBuilder.select( - Functions.count(Column.as(DataSource.DS_NAME_A)), DataSource.DS_NAME_A, DataSource.SCHEMA_NAME_A, DataSourceTable.TABLE_NAME_A, @@ -52,10 +48,10 @@ public class SqlBuilderTests { TbAppCollectTableInfo.FILTER_TYPE_A, TbAppCollectTableInfo.SRC_TOPIC_A, TbAppCollectTableInfo.SRC_PULSAR_ADDR_A, - Alias.of(TbAppYarnJobConfigSync.JOB_MANAGER_MEMORY_A, "sync_job_manager_memory"), - Alias.of(TbAppYarnJobConfigSync.TASK_MANAGER_MEMORY_A, "sync_task_manager_memory"), - Alias.of(TbAppYarnJobConfigCompaction.JOB_MANAGER_MEMORY_A, "compaction_job_manager_memory"), - Alias.of(TbAppYarnJobConfigCompaction.TASK_MANAGER_MEMORY_A, "compaction_task_manger_momory"), + Alias.of("sync_config." + TbAppYarnJobConfig.JOB_MANAGER_MEMORY_O, "sync_job_manager_memory"), + Alias.of("sync_config." + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_O, "sync_task_manager_memory"), + Alias.of("compaction_config." + TbAppYarnJobConfig.JOB_MANAGER_MEMORY_O, "compaction_job_manager_memory"), + Alias.of("compaction_config." + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_O, "compaction_task_manger_momory"), TbAppCollectTableInfo.PARTITION_FIELD_A, TbAppHudiSyncState.MESSAGE_ID_A, TbAppGlobalConfig.METRIC_PUBLISH_URL_A, @@ -84,8 +80,8 @@ public class SqlBuilderTests { DataSourceTableField._alias_, TbAppFlinkJobConfig._alias_, TbAppHudiJobConfig._alias_, - TbAppYarnJobConfigSync._alias_, - TbAppYarnJobConfigCompaction._alias_, + Alias.of(TbAppYarnJobConfig._origin_, "sync_config"), + Alias.of(TbAppYarnJobConfig._origin_, "compaction_config"), TbAppGlobalConfig._alias_, TbAppCollectTableInfo._alias_ ) @@ -104,18 +100,38 @@ public class SqlBuilderTests { .andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A)) .andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) .andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A)) - .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfigSync.ID_A)) - .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfigCompaction.ID_A)) + .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("sync_config." + TbAppYarnJobConfig.ID_O)) + .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("compaction_config." + TbAppYarnJobConfig.ID_O)) .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A)) .andEq(TbAppFlinkJobConfig.ID_A, 1542097984132706304L) .andEq(TbAppCollectTableInfo.ALIAS_A, "crm_cfguse_channel") .andEq(TbAppCollectTableInfo.STATUS_A, "y") .andEq(TbAppFlinkJobConfig.STATUS_A, "y") .andEq(TbAppHudiJobConfig.STATUS_A, "y") - .andEq(TbAppYarnJobConfigSync.STATUS_A, "y") - .andEq(TbAppYarnJobConfigCompaction.STATUS_A, "y") + .andEq("sync_config." + TbAppYarnJobConfig.STATUS_O, "y") + .andEq("compaction_config." + TbAppYarnJobConfig.STATUS_O, "y") .orderBy(DataSourceTableField.FIELD_SEQ_A) .build() + ));*/ + System.out.println(SqlUtil.formatSql( + SqlBuilder.select("*") + .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_, TbAppCollectTableVersion._alias_) + .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, "y") + .andEq(TbAppCollectTableInfo.STATUS_A, "y") + .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) + .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) + .andEq(TbAppCollectTableInfo.PRIORITY_A, 10000) + .andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false) + .andEq(TbAppCollectTableVersion.VERSION_A, "2018") + /*.join(TbAppCollectTableVersion._alias_) + .onEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) + .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) + .whereLt(TbAppCollectTableInfo.PRIORITY_A, 10000) + .andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false) + .andEq(TbAppCollectTableVersion.VERSION_A, "2018") + .andEq(TbAppCollectTableInfo.STATUS_A, "y")*/ + .build() )); } }