feat(info-query): 使用tags来区分重点表与否

This commit is contained in:
v-zhangjc9
2024-07-10 16:22:27 +08:00
parent 6c9f43d310
commit aaa243d626
7 changed files with 78 additions and 24 deletions

View File

@@ -21,4 +21,15 @@ public class BaseService {
protected static String column(Alias table, String column) {
return StrUtil.format("{}.{}", table.getAlias(), column);
}
/**
* 用于构造匹配标签的列
*/
protected static String tagsMatchColumn(String column) {
return StrUtil.format("CONCAT(',',{},',')", column);
}
protected static String tagsMatchValue(String value) {
return StrUtil.format(",{},", value);
}
}

View File

@@ -11,6 +11,7 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.exception.ConfigException;
import com.lanyuanxiaoyao.service.common.exception.TableMetaNotFoundException;
@@ -30,8 +31,15 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.*;
import static com.lanyuanxiaoyao.service.common.SQLConstants.IapDatahub.*;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppGlobalConfig;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiJobConfig;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiSyncState;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppYarnJobConfig;
import static com.lanyuanxiaoyao.service.common.SQLConstants.IapDatahub.DataSource;
import static com.lanyuanxiaoyao.service.common.SQLConstants.IapDatahub.DataSourceTable;
import static com.lanyuanxiaoyao.service.common.SQLConstants.IapDatahub.DataSourceTableField;
/**
* Table Meta
@@ -65,28 +73,28 @@ public class TableMetaService extends BaseService {
checkMoreThanOne(fieldName, iterable);
}
public ImmutableList<TableMeta> tableMetaList(Long flinkJobId) {
return tableMetaList(flinkJobId, null);
}
public static ConditionSqlBuilder<WhereSqlBuilder> generateSimpleTableMetaList(SelectSqlBuilder builder) {
return generateSimpleTableMetaList(builder, null, null, null, null);
return generateSimpleTableMetaList(builder, null, null, null, null, null);
}
public static ConditionSqlBuilder<WhereSqlBuilder> generateSimpleTableMetaList(SelectSqlBuilder builder, Integer priority) {
return generateSimpleTableMetaList(builder, null, null, priority, null);
return generateSimpleTableMetaList(builder, null, null, priority, null, null);
}
public static ConditionSqlBuilder<WhereSqlBuilder> generateSimpleTableMetaList(SelectSqlBuilder builder, Boolean isFocus) {
return generateSimpleTableMetaList(builder, null, null, null, null, isFocus ? Constants.TAGS_FOCUS : null);
}
public static ConditionSqlBuilder<WhereSqlBuilder> generateSimpleTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText) {
return generateSimpleTableMetaList(builder, flinkJobId, aliasText, null, null);
return generateSimpleTableMetaList(builder, flinkJobId, aliasText, null, null, null);
}
public static ConditionSqlBuilder<WhereSqlBuilder> generateSimpleTableMetaList(SelectSqlBuilder builder, String hdfs) {
return generateSimpleTableMetaList(builder, null, null, null, hdfs);
return generateSimpleTableMetaList(builder, null, null, null, hdfs, null);
}
// 相比正式的table meta查询少了表字段的匹配大幅提高了查询速度
public static ConditionSqlBuilder<WhereSqlBuilder> generateSimpleTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText, Integer priority, String hdfs) {
public static ConditionSqlBuilder<WhereSqlBuilder> generateSimpleTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText, Integer priority, String hdfs, String tag) {
return builder
.from(
DataSource._alias_,
@@ -118,6 +126,7 @@ public class TableMetaService extends BaseService {
.andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText)
.andGe(ObjectUtil.isNotNull(priority), TbAppCollectTableInfo.PRIORITY_A, priority)
.andEq(StrUtil.isNotBlank(hdfs), TbAppCollectTableInfo.TGT_HDFS_PATH_A, hdfs)
.andLike(StrUtil.isNotBlank(tag), tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(tag))
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y)
@@ -125,6 +134,10 @@ public class TableMetaService extends BaseService {
.andEq("tayjc_compaction.status", STATUS_Y);
}
public ImmutableList<TableMeta> tableMetaList(Long flinkJobId) {
return tableMetaList(flinkJobId, null);
}
private ImmutableList<TableMeta> tableMetaList(Long flinkJobId, String aliasText) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
@@ -529,7 +542,7 @@ public class TableMetaService extends BaseService {
return mysqlJdbcTemplate.queryForObject(
generateSimpleTableMetaList(
SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)),
10000
true
).build(),
Long.class
);
@@ -552,7 +565,7 @@ public class TableMetaService extends BaseService {
return mysqlJdbcTemplate.queryForObject(
generateSimpleTableMetaList(
SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)),
10000
true
).build(),
Long.class
);
@@ -575,7 +588,7 @@ public class TableMetaService extends BaseService {
return mysqlJdbcTemplate.queryForObject(
generateSimpleTableMetaList(
SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)),
10000
true
).build(),
Long.class
);

View File

@@ -7,6 +7,7 @@ import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated;
@@ -24,7 +25,10 @@ import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.*;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableVersion;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiSyncState;
/**
* Version
@@ -70,7 +74,7 @@ public class VersionService extends BaseService {
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A))
.andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A))
.andLt(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andNotLike(tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(Constants.TAGS_FOCUS))
.andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false)
.andEq(TbAppCollectTableVersion.VERSION_A, version);
}
@@ -103,7 +107,7 @@ public class VersionService extends BaseService {
.join(TbAppCollectTableVersion._alias_)
.onEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A))
.andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A))
.whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.whereLike(tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(Constants.TAGS_FOCUS))
.andEq(TbAppCollectTableVersion.SCHEDULED_A, false)
.andEq(TbAppCollectTableVersion.VERSION_A, version)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y);
@@ -281,7 +285,7 @@ public class VersionService extends BaseService {
private SqlBuilder generateUnReceiveVersionNormalTableSql(SelectSqlBuilder builder, String version) {
return builder
.from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_)
.whereLt(TbAppCollectTableInfo.PRIORITY_A, 10000)
.whereNotLike(tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(Constants.TAGS_FOCUS))
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
@@ -318,7 +322,7 @@ public class VersionService extends BaseService {
private SqlBuilder generateUnReceiveVersionFocusTable(SelectSqlBuilder builder, String version) {
return builder
.from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_)
.whereGe(TbAppCollectTableInfo.PRIORITY_A, 10000)
.whereLike(tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(Constants.TAGS_FOCUS))
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)

View File

@@ -1,13 +1,17 @@
package com.test;
import club.kingon.sql.builder.LMDFunction;
import club.kingon.sql.builder.SelectSqlBuilder;
import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.entry.Alias;
import club.kingon.sql.builder.entry.Column;
import club.kingon.sql.builder.enums.Operator;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.sql.SqlUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.SQLConstants;
import org.eclipse.collections.api.factory.Maps;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig;
@@ -67,10 +71,28 @@ public class SqlBuilderTests {
.orderBy(SQLConstants.IapDatahub.DataSourceTableField.FIELD_SEQ_A);
}
private static String tagsMatchColumn(String column) {
return StrUtil.format("CONCAT(',',{},',')", column);
}
private static String tagsMatchValue(String value) {
return StrUtil.format(",{},", value);
}
public static void main(String[] args) {
System.out.println(SqlUtil.formatSql(
SqlBuilder.delete(SQLConstants.HudiCollectBuild.TbAppCollectTableVersion._origin_)
.whereLe(SQLConstants.HudiCollectBuild.TbAppCollectTableVersion.UPDATE_TIME_O, Column.as("DATE_SUB(CURDATE(), INTERVAL 14 DAY)"))
SqlBuilder.selectAll()
.from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_)
.whereNotLike(tagsMatchColumn(TbAppCollectTableInfo.TAGS_A), tagsMatchValue(Constants.TAGS_FOCUS))
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andNotIn(
StrUtil.format("concat({}, {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A),
SqlBuilder.select(StrUtil.format("concat({}, {})", SQLConstants.HudiCollectBuild.TbAppCollectTableVersion.FLINK_JOB_ID_A, SQLConstants.HudiCollectBuild.TbAppCollectTableVersion.ALIAS_A))
.from(SQLConstants.HudiCollectBuild.TbAppCollectTableVersion._alias_)
.whereEq(SQLConstants.HudiCollectBuild.TbAppCollectTableVersion.VERSION_A, 1)
)
.build()
));
}