diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index a34134a..d8d1a95 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -205,7 +205,7 @@ public interface Constants { String TAGS_TRACE_LATEST_OP_TS = "TRACE_LATEST_OP_TS"; String TAGS_SOURCE_READER = "SOURCE_READER"; String TAGS_USE_TEST_JAR = "USE_TEST_JAR"; - String TAGS_ODS = "ODS"; + String TAGS_FOCUS = "FOCUS"; String TAGS_ODS_FOCUS = "ODS_FOCUS"; String TAGS_CRM_FOCUS = "CRM_FOCUS"; diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/BaseService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/BaseService.java index 1a1bced..6d5844b 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/BaseService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/BaseService.java @@ -21,4 +21,15 @@ public class BaseService { protected static String column(Alias table, String column) { return StrUtil.format("{}.{}", table.getAlias(), column); } + + /** + * 用于构造匹配标签的列 + */ + protected static String tagsMatchColumn(String column) { + return StrUtil.format("CONCAT(',',{},',')", column); + } + + protected static String tagsMatchValue(String value) { + return StrUtil.format(",{},", value); + } } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java index ed58847..c8340c7 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java @@ -11,6 +11,7 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; +import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.exception.ConfigException; import com.lanyuanxiaoyao.service.common.exception.TableMetaNotFoundException; @@ -30,8 +31,15 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Service; -import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.*; -import static com.lanyuanxiaoyao.service.common.SQLConstants.IapDatahub.*; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppGlobalConfig; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiJobConfig; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiSyncState; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppYarnJobConfig; +import static com.lanyuanxiaoyao.service.common.SQLConstants.IapDatahub.DataSource; +import static com.lanyuanxiaoyao.service.common.SQLConstants.IapDatahub.DataSourceTable; +import static com.lanyuanxiaoyao.service.common.SQLConstants.IapDatahub.DataSourceTableField; /** * Table Meta @@ -65,28 +73,28 @@ public class TableMetaService extends BaseService { checkMoreThanOne(fieldName, iterable); } - public ImmutableList tableMetaList(Long flinkJobId) { - return tableMetaList(flinkJobId, null); - } - public static ConditionSqlBuilder generateSimpleTableMetaList(SelectSqlBuilder builder) { - return generateSimpleTableMetaList(builder, null, null, null, null); + return generateSimpleTableMetaList(builder, null, null, null, null, null); } public static ConditionSqlBuilder generateSimpleTableMetaList(SelectSqlBuilder builder, Integer priority) { - return generateSimpleTableMetaList(builder, null, null, priority, null); + return generateSimpleTableMetaList(builder, null, null, priority, null, null); + } + + public static ConditionSqlBuilder generateSimpleTableMetaList(SelectSqlBuilder builder, Boolean isFocus) { + return generateSimpleTableMetaList(builder, null, null, null, null, isFocus ? Constants.TAGS_FOCUS : null); } public static ConditionSqlBuilder generateSimpleTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText) { - return generateSimpleTableMetaList(builder, flinkJobId, aliasText, null, null); + return generateSimpleTableMetaList(builder, flinkJobId, aliasText, null, null, null); } public static ConditionSqlBuilder generateSimpleTableMetaList(SelectSqlBuilder builder, String hdfs) { - return generateSimpleTableMetaList(builder, null, null, null, hdfs); + return generateSimpleTableMetaList(builder, null, null, null, hdfs, null); } // 相比正式的table meta查询少了表字段的匹配,大幅提高了查询速度 - public static ConditionSqlBuilder generateSimpleTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText, Integer priority, String hdfs) { + public static ConditionSqlBuilder generateSimpleTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText, Integer priority, String hdfs, String tag) { return builder .from( DataSource._alias_, @@ -118,6 +126,7 @@ public class TableMetaService extends BaseService { .andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText) .andGe(ObjectUtil.isNotNull(priority), TbAppCollectTableInfo.PRIORITY_A, priority) .andEq(StrUtil.isNotBlank(hdfs), TbAppCollectTableInfo.TGT_HDFS_PATH_A, hdfs) + .andLike(StrUtil.isNotBlank(tag), tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(tag)) .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) .andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y) @@ -125,6 +134,10 @@ public class TableMetaService extends BaseService { .andEq("tayjc_compaction.status", STATUS_Y); } + public ImmutableList tableMetaList(Long flinkJobId) { + return tableMetaList(flinkJobId, null); + } + private ImmutableList tableMetaList(Long flinkJobId, String aliasText) { return Lists.immutable.ofAll( mysqlJdbcTemplate.query( @@ -529,7 +542,7 @@ public class TableMetaService extends BaseService { return mysqlJdbcTemplate.queryForObject( generateSimpleTableMetaList( SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)), - 10000 + true ).build(), Long.class ); @@ -552,7 +565,7 @@ public class TableMetaService extends BaseService { return mysqlJdbcTemplate.queryForObject( generateSimpleTableMetaList( SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)), - 10000 + true ).build(), Long.class ); @@ -575,7 +588,7 @@ public class TableMetaService extends BaseService { return mysqlJdbcTemplate.queryForObject( generateSimpleTableMetaList( SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)), - 10000 + true ).build(), Long.class ); diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/VersionService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/VersionService.java index 7a72585..e7b6c88 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/VersionService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/VersionService.java @@ -7,6 +7,7 @@ import club.kingon.sql.builder.entry.Column; import cn.hutool.core.lang.Pair; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; @@ -24,7 +25,10 @@ import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Service; import org.springframework.transaction.support.TransactionTemplate; -import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.*; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableVersion; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiSyncState; /** * Version @@ -70,7 +74,7 @@ public class VersionService extends BaseService { .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) - .andLt(TbAppCollectTableInfo.PRIORITY_A, 10000) + .andNotLike(tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(Constants.TAGS_FOCUS)) .andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false) .andEq(TbAppCollectTableVersion.VERSION_A, version); } @@ -103,7 +107,7 @@ public class VersionService extends BaseService { .join(TbAppCollectTableVersion._alias_) .onEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) - .whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000) + .whereLike(tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(Constants.TAGS_FOCUS)) .andEq(TbAppCollectTableVersion.SCHEDULED_A, false) .andEq(TbAppCollectTableVersion.VERSION_A, version) .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y); @@ -281,7 +285,7 @@ public class VersionService extends BaseService { private SqlBuilder generateUnReceiveVersionNormalTableSql(SelectSqlBuilder builder, String version) { return builder .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) - .whereLt(TbAppCollectTableInfo.PRIORITY_A, 10000) + .whereNotLike(tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(Constants.TAGS_FOCUS)) .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) @@ -318,7 +322,7 @@ public class VersionService extends BaseService { private SqlBuilder generateUnReceiveVersionFocusTable(SelectSqlBuilder builder, String version) { return builder .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) - .whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000) + .whereLike(tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(Constants.TAGS_FOCUS)) .andEq(TbAppCollectTableInfo.STATUS_A, "y") .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) diff --git a/service-info-query/src/test/java/com/test/SqlBuilderTests.java b/service-info-query/src/test/java/com/test/SqlBuilderTests.java index 5b0d81f..8b9ec6d 100644 --- a/service-info-query/src/test/java/com/test/SqlBuilderTests.java +++ b/service-info-query/src/test/java/com/test/SqlBuilderTests.java @@ -1,13 +1,17 @@ package com.test; +import club.kingon.sql.builder.LMDFunction; import club.kingon.sql.builder.SelectSqlBuilder; import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Column; +import club.kingon.sql.builder.enums.Operator; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.db.sql.SqlUtil; +import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.SQLConstants; +import org.eclipse.collections.api.factory.Maps; import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo; import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig; @@ -67,10 +71,28 @@ public class SqlBuilderTests { .orderBy(SQLConstants.IapDatahub.DataSourceTableField.FIELD_SEQ_A); } + private static String tagsMatchColumn(String column) { + return StrUtil.format("CONCAT(',',{},',')", column); + } + + private static String tagsMatchValue(String value) { + return StrUtil.format(",{},", value); + } + public static void main(String[] args) { System.out.println(SqlUtil.formatSql( - SqlBuilder.delete(SQLConstants.HudiCollectBuild.TbAppCollectTableVersion._origin_) - .whereLe(SQLConstants.HudiCollectBuild.TbAppCollectTableVersion.UPDATE_TIME_O, Column.as("DATE_SUB(CURDATE(), INTERVAL 14 DAY)")) + SqlBuilder.selectAll() + .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) + .whereNotLike(tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(Constants.TAGS_FOCUS)) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andNotIn( + StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A), + SqlBuilder.select(StrUtil.format("concat({}, {})", SQLConstants.HudiCollectBuild.TbAppCollectTableVersion.FLINK_JOB_ID_A, SQLConstants.HudiCollectBuild.TbAppCollectTableVersion.ALIAS_A)) + .from(SQLConstants.HudiCollectBuild.TbAppCollectTableVersion._alias_) + .whereEq(SQLConstants.HudiCollectBuild.TbAppCollectTableVersion.VERSION_A, 1) + ) .build() )); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java index cdc7837..b775e05 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java @@ -1,6 +1,8 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.common.Constants; +import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; import com.lanyuanxiaoyao.service.forest.service.HudiService; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; @@ -44,7 +46,7 @@ public class FocusScheduleJob extends BaseScheduleJob { infoService, hudiService, mapper, - meta -> meta.getPriority() >= 10000, + meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_FOCUS), comment ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java index e4d5fbb..425b34b 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java @@ -2,6 +2,8 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; import cn.hutool.core.util.StrUtil; import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.common.Constants; +import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; import com.lanyuanxiaoyao.service.forest.service.HudiService; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; @@ -47,7 +49,7 @@ public class FocusUnVersionUpdateScheduleJob extends BaseScheduleJob { infoService, hudiService, mapper, - meta -> meta.getPriority() >= 10000 + meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_FOCUS) && unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias())), comment );