fix(info-query): 修复查询表详情不严谨导致将不完整的表也当作正常表使用

不完整的表插入压缩队列后导致处理队列的时间变长
This commit is contained in:
2024-01-24 12:47:56 +08:00
parent a7203977d5
commit aa3863de17
2 changed files with 161 additions and 194 deletions

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.info.service; package com.lanyuanxiaoyao.service.info.service;
import club.kingon.sql.builder.SelectSqlBuilder;
import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Alias;
import club.kingon.sql.builder.entry.Column; import club.kingon.sql.builder.entry.Column;
@@ -65,9 +66,69 @@ public class TableMetaService extends BaseService {
return tableMetaList(flinkJobId, null); return tableMetaList(flinkJobId, null);
} }
private SqlBuilder generateTableMetaList(SelectSqlBuilder builder) {
return generateTableMetaList(builder, null, null, null, null);
}
private SqlBuilder generateTableMetaList(SelectSqlBuilder builder, Integer priority) {
return generateTableMetaList(builder, null, null, priority, null);
}
private SqlBuilder generateTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText) {
return generateTableMetaList(builder, flinkJobId, aliasText, null, null);
}
private SqlBuilder generateTableMetaList(SelectSqlBuilder builder, String hdfs) {
return generateTableMetaList(builder, null, null, null, hdfs);
}
private SqlBuilder generateTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText, Integer priority, String hdfs) {
return builder
.from(
DataSource._alias_,
DataSourceTable._alias_,
DataSourceTableField._alias_,
TbAppFlinkJobConfig._alias_,
TbAppHudiJobConfig._alias_,
Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"),
Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"),
TbAppGlobalConfig._alias_,
TbAppCollectTableInfo._alias_,
TbAppHudiSyncState._alias_
)
.whereEq(DataSource.DS_ROLE_A, "src")
.andEq(DataSource.DS_STATE_A, STATUS_Y)
.andEq(DataSource.RECORD_STATE_A, STATUS_Y)
.andEq(DataSourceTable.DS_ID_A, Column.as(DataSource.DS_ID_A))
.andEq(DataSourceTable.RECORD_STATE_A, STATUS_Y)
.andEq(DataSourceTableField.TABLE_ID_A, Column.as(DataSourceTable.TABLE_ID_A))
.andEq(DataSourceTableField.RECORD_STATE_A, STATUS_Y)
.andIn(DataSource.DS_TYPE_A, "udal", "telepg")
.andEq(DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A))
.andEq(DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A))
.andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A))
.andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A))
.andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id"))
.andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id"))
.andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A))
.andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText)
.andEq(ObjectUtil.isNotNull(priority), TbAppCollectTableInfo.PRIORITY_A, priority)
.andEq(StrUtil.isNotBlank(hdfs), TbAppCollectTableInfo.TGT_HDFS_PATH_A, hdfs)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y)
.andEq("tayjc_sync.status", STATUS_Y)
.andEq("tayjc_compaction.status", STATUS_Y)
.orderBy(DataSourceTableField.FIELD_SEQ_A);
}
private ImmutableList<TableMeta> tableMetaList(Long flinkJobId, String aliasText) { private ImmutableList<TableMeta> tableMetaList(Long flinkJobId, String aliasText) {
return Lists.immutable.ofAll( return Lists.immutable.ofAll(
mysqlJdbcTemplate.query( mysqlJdbcTemplate.query(
generateTableMetaList(
SqlBuilder.select( SqlBuilder.select(
DataSource.DS_NAME_A, DataSource.DS_NAME_A,
DataSource.SCHEMA_NAME_A, DataSource.SCHEMA_NAME_A,
@@ -128,45 +189,10 @@ public class TableMetaService extends BaseService {
TbAppGlobalConfig.ZK_URL_A, TbAppGlobalConfig.ZK_URL_A,
TbAppCollectTableInfo.VERSION_A, TbAppCollectTableInfo.VERSION_A,
DataSourceTableField.SCALE_A DataSourceTableField.SCALE_A
) ),
.from( flinkJobId,
DataSource._alias_, aliasText
DataSourceTable._alias_, ).build(),
DataSourceTableField._alias_,
TbAppFlinkJobConfig._alias_,
TbAppHudiJobConfig._alias_,
Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"),
Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"),
TbAppGlobalConfig._alias_,
TbAppCollectTableInfo._alias_,
TbAppHudiSyncState._alias_
)
.whereEq(DataSource.DS_ROLE_A, "src")
.andEq(DataSource.DS_STATE_A, STATUS_Y)
.andEq(DataSource.RECORD_STATE_A, STATUS_Y)
.andEq(DataSourceTable.DS_ID_A, Column.as(DataSource.DS_ID_A))
.andEq(DataSourceTable.RECORD_STATE_A, STATUS_Y)
.andEq(DataSourceTableField.TABLE_ID_A, Column.as(DataSourceTable.TABLE_ID_A))
.andEq(DataSourceTableField.RECORD_STATE_A, STATUS_Y)
.andIn(DataSource.DS_TYPE_A, "udal", "telepg")
.andEq(DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A))
.andEq(DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A))
.andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A))
.andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A))
.andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id"))
.andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id"))
.andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A))
.andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y)
.andEq("tayjc_sync.status", STATUS_Y)
.andEq("tayjc_compaction.status", STATUS_Y)
.orderBy(DataSourceTableField.FIELD_SEQ_A)
.build(),
(rs, row) -> TableMeta.RowMeta.builder() (rs, row) -> TableMeta.RowMeta.builder()
.dsName(rs.getString(1)) .dsName(rs.getString(1))
.schemaName(rs.getString(2)) .schemaName(rs.getString(2))
@@ -456,10 +482,9 @@ public class TableMetaService extends BaseService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long tableCount() { public Long tableCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
generateTableMetaList(
SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)) SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A))
.from(TbAppCollectTableInfo._alias_) ).build(),
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(),
Long.class Long.class
); );
} }
@@ -468,11 +493,10 @@ public class TableMetaService extends BaseService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long tableFocusCount() { public Long tableFocusCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)) generateTableMetaList(
.from(TbAppCollectTableInfo._alias_) SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)),
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) 10000
.andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) ).build(),
.build(),
Long.class Long.class
); );
} }
@@ -481,10 +505,9 @@ public class TableMetaService extends BaseService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long hudiCount() { public Long hudiCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
generateTableMetaList(
SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)) SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A))
.from(TbAppCollectTableInfo._alias_) ).build(),
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(),
Long.class Long.class
); );
} }
@@ -493,11 +516,10 @@ public class TableMetaService extends BaseService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long hudiFocusCount() { public Long hudiFocusCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)) generateTableMetaList(
.from(TbAppCollectTableInfo._alias_) SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)),
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) 10000
.andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) ).build(),
.build(),
Long.class Long.class
); );
} }
@@ -506,10 +528,9 @@ public class TableMetaService extends BaseService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long hiveCount() { public Long hiveCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
generateTableMetaList(
SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)) SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A))
.from(TbAppCollectTableInfo._alias_) ).build(),
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(),
Long.class Long.class
); );
} }
@@ -518,11 +539,10 @@ public class TableMetaService extends BaseService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Long hiveFocusCount() { public Long hiveFocusCount() {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)) generateTableMetaList(
.from(TbAppCollectTableInfo._alias_) SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)),
.whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) 10000
.andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) ).build(),
.build(),
Long.class Long.class
); );
} }
@@ -532,8 +552,8 @@ public class TableMetaService extends BaseService {
public ImmutableList<SimpleTableMeta> simpleTableMetas(Long flinkJobId, String alias) { public ImmutableList<SimpleTableMeta> simpleTableMetas(Long flinkJobId, String alias) {
return Lists.immutable.ofAll( return Lists.immutable.ofAll(
mysqlJdbcTemplate.query( mysqlJdbcTemplate.query(
SqlBuilder generateTableMetaList(
.select( SqlBuilder.select(
TbAppFlinkJobConfig.ID_A, TbAppFlinkJobConfig.ID_A,
TbAppFlinkJobConfig.NAME_A, TbAppFlinkJobConfig.NAME_A,
TbAppCollectTableInfo.ALIAS_A, TbAppCollectTableInfo.ALIAS_A,
@@ -542,14 +562,10 @@ public class TableMetaService extends BaseService {
TbAppCollectTableInfo.TGT_DB_A, TbAppCollectTableInfo.TGT_DB_A,
TbAppCollectTableInfo.TGT_TABLE_A, TbAppCollectTableInfo.TGT_TABLE_A,
TbAppCollectTableInfo.TGT_HDFS_PATH_A TbAppCollectTableInfo.TGT_HDFS_PATH_A
) ),
.from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) flinkJobId,
.whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) alias
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) ).build(),
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias)
.build(),
(rs, row) -> new SimpleTableMeta( (rs, row) -> new SimpleTableMeta(
rs.getLong(1), rs.getLong(1),
rs.getString(2), rs.getString(2),
@@ -568,14 +584,11 @@ public class TableMetaService extends BaseService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Boolean existsTable(Long flinkJobId, String alias) { public Boolean existsTable(Long flinkJobId, String alias) {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(*) > 0") generateTableMetaList(
.from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_) SqlBuilder.select("count(*) > 0"),
.whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) flinkJobId,
.andEq(TbAppFlinkJobConfig.ID_A, flinkJobId) alias
.andEq(TbAppCollectTableInfo.ALIAS_A, alias) ).build(),
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(),
Boolean.class Boolean.class
); );
} }
@@ -584,13 +597,10 @@ public class TableMetaService extends BaseService {
@Retryable(Throwable.class) @Retryable(Throwable.class)
public Boolean existsTableByHdfs(String hdfs) { public Boolean existsTableByHdfs(String hdfs) {
return mysqlJdbcTemplate.queryForObject( return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(*) > 0") generateTableMetaList(
.from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_) SqlBuilder.select("count(*) > 0"),
.whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) hdfs
.andEq(TbAppCollectTableInfo.TGT_HDFS_PATH_A, hdfs) ).build(),
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(),
Boolean.class Boolean.class
); );
} }

View File

@@ -1,5 +1,6 @@
package com.test; package com.test;
import club.kingon.sql.builder.SelectSqlBuilder;
import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Alias;
import club.kingon.sql.builder.entry.Column; import club.kingon.sql.builder.entry.Column;
@@ -9,81 +10,19 @@ import cn.hutool.db.sql.SqlUtil;
import com.eshore.odcp.hudi.connector.SQLConstants; import com.eshore.odcp.hudi.connector.SQLConstants;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppYarnJobConfig;
/** /**
* @author lanyuanxiaoyao * @author lanyuanxiaoyao
* @date 2023-06-07 * @date 2023-06-07
*/ */
public class SqlBuilderTests { public class SqlBuilderTests {
public static void main(String[] args) { private static Long flinkJobId = 100086L;
Long flinkJobId = 100086L; private static String alias = "hello_world";
String alias = "hello_world"; private static String STATUS_Y = "y";
String STATUS_Y = "y"; private static String STATUS_N = "n";
String STATUS_N = "n";
System.out.println(SqlUtil.formatSql( private static SqlBuilder generateTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText) {
SqlBuilder.select( return builder
SQLConstants.IapDatahub.DataSource.DS_NAME_A,
SQLConstants.IapDatahub.DataSource.SCHEMA_NAME_A,
SQLConstants.IapDatahub.DataSourceTable.TABLE_NAME_A,
SQLConstants.IapDatahub.DataSourceTable.TABLE_TYPE_A,
SQLConstants.IapDatahub.DataSourceTableField.FIELD_NAME_A,
SQLConstants.IapDatahub.DataSourceTableField.FIELD_SEQ_A,
SQLConstants.IapDatahub.DataSourceTableField.FIELD_TYPE_A,
SQLConstants.IapDatahub.DataSourceTableField.PRIMARY_KEY_A,
SQLConstants.IapDatahub.DataSourceTableField.PARTITION_KEY_A,
SQLConstants.IapDatahub.DataSourceTableField.LENGTH_A,
TbAppCollectTableInfo.TGT_DB_A,
TbAppCollectTableInfo.TGT_TABLE_A,
TbAppCollectTableInfo.TGT_TABLE_TYPE_A,
TbAppCollectTableInfo.TGT_HDFS_PATH_A,
TbAppHudiJobConfig.WRITE_TASKS_A,
TbAppHudiJobConfig.WRITE_OPERATION_A,
TbAppHudiJobConfig.WRITE_TASK_MAX_MEMORY_A,
TbAppHudiJobConfig.WRITE_BATCH_SIZE_A,
TbAppHudiJobConfig.WRITE_RATE_LIMIT_A,
TbAppCollectTableInfo.BUCKET_NUMBER_A,
TbAppHudiJobConfig.COMPACTION_STRATEGY_A,
TbAppHudiJobConfig.COMPACTION_TASKS_A,
TbAppHudiJobConfig.COMPACTION_DELTA_COMMITS_A,
TbAppHudiJobConfig.COMPACTION_DELTA_SECONDS_A,
TbAppHudiJobConfig.COMPACTION_ASYNC_ENABLED_A,
TbAppHudiJobConfig.COMPACTION_MAX_MEMORY_A,
TbAppHudiJobConfig.CONFIGS_A,
TbAppCollectTableInfo.FILTER_FIELD_A,
TbAppCollectTableInfo.FILTER_VALUES_A,
TbAppCollectTableInfo.FILTER_TYPE_A,
TbAppCollectTableInfo.SRC_TOPIC_A,
TbAppCollectTableInfo.SRC_PULSAR_ADDR_A,
Alias.of("tayjc_sync.job_manager_memory", "sync_job_manager_memory"),
Alias.of("tayjc_sync.task_manager_memory", "sync_task_manager_memory"),
Alias.of("tayjc_compaction.job_manager_memory", "compaction_job_manager_memory"),
Alias.of("tayjc_compaction.task_manager_memory", "compaction_task_manger_momory"),
TbAppCollectTableInfo.PARTITION_FIELD_A,
TbAppHudiSyncState.MESSAGE_ID_A,
TbAppGlobalConfig.METRIC_PUBLISH_URL_A,
TbAppGlobalConfig.METRIC_PROMETHEUS_URL_A,
TbAppGlobalConfig.METRIC_API_URL_A,
TbAppGlobalConfig.METRIC_PUBLISH_DELAY_A,
TbAppGlobalConfig.METRIC_PUBLISH_PERIOD_A,
TbAppGlobalConfig.METRIC_PUBLISH_TIMEOUT_A,
TbAppGlobalConfig.METRIC_PUBLISH_BATCH_A,
Alias.of(TbAppFlinkJobConfig.ID_A, "job_id"),
Alias.of(TbAppFlinkJobConfig.NAME_A, "job_name"),
TbAppGlobalConfig.CHECKPOINT_ROOT_PATH_A,
TbAppHudiJobConfig.SOURCE_TASKS_A,
TbAppCollectTableInfo.ALIAS_A,
SQLConstants.IapDatahub.DataSource.CONNECTION_A,
TbAppCollectTableInfo.PRIORITY_A,
SQLConstants.IapDatahub.DataSource.DS_TYPE_A,
TbAppHudiJobConfig.KEEP_FILE_VERSION_A,
TbAppHudiJobConfig.KEEP_COMMIT_VERSION_A,
TbAppCollectTableInfo.TAGS_A,
TbAppGlobalConfig.ZK_URL_A,
TbAppCollectTableInfo.VERSION_A,
SQLConstants.IapDatahub.DataSourceTableField.SCALE_A
)
.from( .from(
SQLConstants.IapDatahub.DataSource._alias_, SQLConstants.IapDatahub.DataSource._alias_,
SQLConstants.IapDatahub.DataSourceTable._alias_, SQLConstants.IapDatahub.DataSourceTable._alias_,
@@ -114,13 +53,31 @@ public class SqlBuilderTests {
.andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A)) .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A))
.andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) .andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y) .andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y)
.andEq("tayjc_sync.status", STATUS_Y) .andEq("tayjc_sync.status", STATUS_Y)
.andEq("tayjc_compaction.status", STATUS_Y) .andEq("tayjc_compaction.status", STATUS_Y)
.orderBy(SQLConstants.IapDatahub.DataSourceTableField.FIELD_SEQ_A) .orderBy(SQLConstants.IapDatahub.DataSourceTableField.FIELD_SEQ_A);
}
public static void main(String[] args) {
System.out.println(SqlUtil.formatSql(
generateTableMetaList(
SqlBuilder.select(
TbAppFlinkJobConfig.ID_A,
TbAppFlinkJobConfig.NAME_A,
TbAppCollectTableInfo.ALIAS_A,
TbAppCollectTableInfo.SRC_SCHEMA_A,
TbAppCollectTableInfo.SRC_TABLE_A,
TbAppCollectTableInfo.TGT_DB_A,
TbAppCollectTableInfo.TGT_TABLE_A,
TbAppCollectTableInfo.TGT_HDFS_PATH_A
),
flinkJobId,
alias
)
.build() .build()
)); ));
} }