From aa3863de1738c9dc0bffbcac220df449cdb3547f Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 24 Jan 2024 12:47:56 +0800 Subject: [PATCH] =?UTF-8?q?fix(info-query):=20=E4=BF=AE=E5=A4=8D=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E8=A1=A8=E8=AF=A6=E6=83=85=E4=B8=8D=E4=B8=A5=E8=B0=A8?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E5=B0=86=E4=B8=8D=E5=AE=8C=E6=95=B4=E7=9A=84?= =?UTF-8?q?=E8=A1=A8=E4=B9=9F=E5=BD=93=E4=BD=9C=E6=AD=A3=E5=B8=B8=E8=A1=A8?= =?UTF-8?q?=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 不完整的表插入压缩队列后导致处理队列的时间变长 --- .../info/service/TableMetaService.java | 194 +++++++++--------- .../test/java/com/test/SqlBuilderTests.java | 161 ++++++--------- 2 files changed, 161 insertions(+), 194 deletions(-) diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java index 87a54de..b265c3e 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.info.service; +import club.kingon.sql.builder.SelectSqlBuilder; import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Column; @@ -65,10 +66,70 @@ public class TableMetaService extends BaseService { return tableMetaList(flinkJobId, null); } + private SqlBuilder generateTableMetaList(SelectSqlBuilder builder) { + return generateTableMetaList(builder, null, null, null, null); + } + + private SqlBuilder generateTableMetaList(SelectSqlBuilder builder, Integer priority) { + return generateTableMetaList(builder, null, null, priority, null); + } + + private SqlBuilder generateTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText) { + return generateTableMetaList(builder, flinkJobId, aliasText, null, null); + } + + private SqlBuilder generateTableMetaList(SelectSqlBuilder builder, String hdfs) { + return generateTableMetaList(builder, null, null, null, hdfs); + } + + private SqlBuilder generateTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText, Integer priority, String hdfs) { + return builder + .from( + DataSource._alias_, + DataSourceTable._alias_, + DataSourceTableField._alias_, + TbAppFlinkJobConfig._alias_, + TbAppHudiJobConfig._alias_, + Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"), + Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"), + TbAppGlobalConfig._alias_, + TbAppCollectTableInfo._alias_, + TbAppHudiSyncState._alias_ + ) + .whereEq(DataSource.DS_ROLE_A, "src") + .andEq(DataSource.DS_STATE_A, STATUS_Y) + .andEq(DataSource.RECORD_STATE_A, STATUS_Y) + .andEq(DataSourceTable.DS_ID_A, Column.as(DataSource.DS_ID_A)) + .andEq(DataSourceTable.RECORD_STATE_A, STATUS_Y) + .andEq(DataSourceTableField.TABLE_ID_A, Column.as(DataSourceTable.TABLE_ID_A)) + .andEq(DataSourceTableField.RECORD_STATE_A, STATUS_Y) + .andIn(DataSource.DS_TYPE_A, "udal", "telepg") + .andEq(DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A)) + .andEq(DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A)) + .andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A)) + .andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) + .andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A)) + .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id")) + .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id")) + .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A)) + .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) + .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) + .andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText) + .andEq(ObjectUtil.isNotNull(priority), TbAppCollectTableInfo.PRIORITY_A, priority) + .andEq(StrUtil.isNotBlank(hdfs), TbAppCollectTableInfo.TGT_HDFS_PATH_A, hdfs) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y) + .andEq("tayjc_sync.status", STATUS_Y) + .andEq("tayjc_compaction.status", STATUS_Y) + .orderBy(DataSourceTableField.FIELD_SEQ_A); + } + private ImmutableList tableMetaList(Long flinkJobId, String aliasText) { return Lists.immutable.ofAll( mysqlJdbcTemplate.query( - SqlBuilder.select( + generateTableMetaList( + SqlBuilder.select( DataSource.DS_NAME_A, DataSource.SCHEMA_NAME_A, DataSourceTable.TABLE_NAME_A, @@ -128,45 +189,10 @@ public class TableMetaService extends BaseService { TbAppGlobalConfig.ZK_URL_A, TbAppCollectTableInfo.VERSION_A, DataSourceTableField.SCALE_A - ) - .from( - DataSource._alias_, - DataSourceTable._alias_, - DataSourceTableField._alias_, - TbAppFlinkJobConfig._alias_, - TbAppHudiJobConfig._alias_, - Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"), - Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"), - TbAppGlobalConfig._alias_, - TbAppCollectTableInfo._alias_, - TbAppHudiSyncState._alias_ - ) - .whereEq(DataSource.DS_ROLE_A, "src") - .andEq(DataSource.DS_STATE_A, STATUS_Y) - .andEq(DataSource.RECORD_STATE_A, STATUS_Y) - .andEq(DataSourceTable.DS_ID_A, Column.as(DataSource.DS_ID_A)) - .andEq(DataSourceTable.RECORD_STATE_A, STATUS_Y) - .andEq(DataSourceTableField.TABLE_ID_A, Column.as(DataSourceTable.TABLE_ID_A)) - .andEq(DataSourceTableField.RECORD_STATE_A, STATUS_Y) - .andIn(DataSource.DS_TYPE_A, "udal", "telepg") - .andEq(DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A)) - .andEq(DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A)) - .andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A)) - .andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) - .andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A)) - .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id")) - .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id")) - .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A)) - .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) - .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) - .andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText) - .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y) - .andEq("tayjc_sync.status", STATUS_Y) - .andEq("tayjc_compaction.status", STATUS_Y) - .orderBy(DataSourceTableField.FIELD_SEQ_A) - .build(), + ), + flinkJobId, + aliasText + ).build(), (rs, row) -> TableMeta.RowMeta.builder() .dsName(rs.getString(1)) .schemaName(rs.getString(2)) @@ -456,10 +482,9 @@ public class TableMetaService extends BaseService { @Retryable(Throwable.class) public Long tableCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .build(), + generateTableMetaList( + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)) + ).build(), Long.class ); } @@ -468,11 +493,10 @@ public class TableMetaService extends BaseService { @Retryable(Throwable.class) public Long tableFocusCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) - .build(), + generateTableMetaList( + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {}))", TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A)), + 10000 + ).build(), Long.class ); } @@ -481,10 +505,9 @@ public class TableMetaService extends BaseService { @Retryable(Throwable.class) public Long hudiCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .build(), + generateTableMetaList( + SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)) + ).build(), Long.class ); } @@ -493,11 +516,10 @@ public class TableMetaService extends BaseService { @Retryable(Throwable.class) public Long hudiFocusCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) - .build(), + generateTableMetaList( + SqlBuilder.select(StrUtil.format("count(distinct {}) as count", TbAppCollectTableInfo.TGT_HDFS_PATH_A)), + 10000 + ).build(), Long.class ); } @@ -506,10 +528,9 @@ public class TableMetaService extends BaseService { @Retryable(Throwable.class) public Long hiveCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .build(), + generateTableMetaList( + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)) + ).build(), Long.class ); } @@ -518,11 +539,10 @@ public class TableMetaService extends BaseService { @Retryable(Throwable.class) public Long hiveFocusCount() { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)) - .from(TbAppCollectTableInfo._alias_) - .whereEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andGe(TbAppCollectTableInfo.PRIORITY_A, 10000) - .build(), + generateTableMetaList( + SqlBuilder.select(StrUtil.format("count(distinct concat({}, {})) as count", TbAppCollectTableInfo.HIVE_DB_A, TbAppCollectTableInfo.HIVE_TABLE_A)), + 10000 + ).build(), Long.class ); } @@ -532,8 +552,8 @@ public class TableMetaService extends BaseService { public ImmutableList simpleTableMetas(Long flinkJobId, String alias) { return Lists.immutable.ofAll( mysqlJdbcTemplate.query( - SqlBuilder - .select( + generateTableMetaList( + SqlBuilder.select( TbAppFlinkJobConfig.ID_A, TbAppFlinkJobConfig.NAME_A, TbAppCollectTableInfo.ALIAS_A, @@ -542,14 +562,10 @@ public class TableMetaService extends BaseService { TbAppCollectTableInfo.TGT_DB_A, TbAppCollectTableInfo.TGT_TABLE_A, TbAppCollectTableInfo.TGT_HDFS_PATH_A - ) - .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_) - .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) - .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) - .andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) - .build(), + ), + flinkJobId, + alias + ).build(), (rs, row) -> new SimpleTableMeta( rs.getLong(1), rs.getString(2), @@ -568,14 +584,11 @@ public class TableMetaService extends BaseService { @Retryable(Throwable.class) public Boolean existsTable(Long flinkJobId, String alias) { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select("count(*) > 0") - .from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_) - .whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) - .andEq(TbAppFlinkJobConfig.ID_A, flinkJobId) - .andEq(TbAppCollectTableInfo.ALIAS_A, alias) - .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .build(), + generateTableMetaList( + SqlBuilder.select("count(*) > 0"), + flinkJobId, + alias + ).build(), Boolean.class ); } @@ -584,13 +597,10 @@ public class TableMetaService extends BaseService { @Retryable(Throwable.class) public Boolean existsTableByHdfs(String hdfs) { return mysqlJdbcTemplate.queryForObject( - SqlBuilder.select("count(*) > 0") - .from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_) - .whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) - .andEq(TbAppCollectTableInfo.TGT_HDFS_PATH_A, hdfs) - .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .build(), + generateTableMetaList( + SqlBuilder.select("count(*) > 0"), + hdfs + ).build(), Boolean.class ); } diff --git a/service-info-query/src/test/java/com/test/SqlBuilderTests.java b/service-info-query/src/test/java/com/test/SqlBuilderTests.java index 25f3626..71b9d75 100644 --- a/service-info-query/src/test/java/com/test/SqlBuilderTests.java +++ b/service-info-query/src/test/java/com/test/SqlBuilderTests.java @@ -1,5 +1,6 @@ package com.test; +import club.kingon.sql.builder.SelectSqlBuilder; import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Column; @@ -9,118 +10,74 @@ import cn.hutool.db.sql.SqlUtil; import com.eshore.odcp.hudi.connector.SQLConstants; import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; -import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig; -import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppYarnJobConfig; /** * @author lanyuanxiaoyao * @date 2023-06-07 */ public class SqlBuilderTests { + private static Long flinkJobId = 100086L; + private static String alias = "hello_world"; + private static String STATUS_Y = "y"; + private static String STATUS_N = "n"; + + private static SqlBuilder generateTableMetaList(SelectSqlBuilder builder, Long flinkJobId, String aliasText) { + return builder + .from( + SQLConstants.IapDatahub.DataSource._alias_, + SQLConstants.IapDatahub.DataSourceTable._alias_, + SQLConstants.IapDatahub.DataSourceTableField._alias_, + TbAppFlinkJobConfig._alias_, + TbAppHudiJobConfig._alias_, + Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"), + Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"), + TbAppGlobalConfig._alias_, + TbAppCollectTableInfo._alias_, + TbAppHudiSyncState._alias_ + ) + .whereEq(SQLConstants.IapDatahub.DataSource.DS_ROLE_A, "src") + .andEq(SQLConstants.IapDatahub.DataSource.DS_STATE_A, STATUS_Y) + .andEq(SQLConstants.IapDatahub.DataSource.RECORD_STATE_A, STATUS_Y) + .andEq(SQLConstants.IapDatahub.DataSourceTable.DS_ID_A, Column.as(SQLConstants.IapDatahub.DataSource.DS_ID_A)) + .andEq(SQLConstants.IapDatahub.DataSourceTable.RECORD_STATE_A, STATUS_Y) + .andEq(SQLConstants.IapDatahub.DataSourceTableField.TABLE_ID_A, Column.as(SQLConstants.IapDatahub.DataSourceTable.TABLE_ID_A)) + .andEq(SQLConstants.IapDatahub.DataSourceTableField.RECORD_STATE_A, STATUS_Y) + .andIn(SQLConstants.IapDatahub.DataSource.DS_TYPE_A, "udal", "telepg") + .andEq(SQLConstants.IapDatahub.DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A)) + .andEq(SQLConstants.IapDatahub.DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A)) + .andEq(SQLConstants.IapDatahub.DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A)) + .andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) + .andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A)) + .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id")) + .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id")) + .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A)) + .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) + .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) + .andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y) + .andEq("tayjc_sync.status", STATUS_Y) + .andEq("tayjc_compaction.status", STATUS_Y) + .orderBy(SQLConstants.IapDatahub.DataSourceTableField.FIELD_SEQ_A); + } + public static void main(String[] args) { - Long flinkJobId = 100086L; - String alias = "hello_world"; - String STATUS_Y = "y"; - String STATUS_N = "n"; System.out.println(SqlUtil.formatSql( - SqlBuilder.select( - SQLConstants.IapDatahub.DataSource.DS_NAME_A, - SQLConstants.IapDatahub.DataSource.SCHEMA_NAME_A, - SQLConstants.IapDatahub.DataSourceTable.TABLE_NAME_A, - SQLConstants.IapDatahub.DataSourceTable.TABLE_TYPE_A, - SQLConstants.IapDatahub.DataSourceTableField.FIELD_NAME_A, - SQLConstants.IapDatahub.DataSourceTableField.FIELD_SEQ_A, - SQLConstants.IapDatahub.DataSourceTableField.FIELD_TYPE_A, - SQLConstants.IapDatahub.DataSourceTableField.PRIMARY_KEY_A, - SQLConstants.IapDatahub.DataSourceTableField.PARTITION_KEY_A, - SQLConstants.IapDatahub.DataSourceTableField.LENGTH_A, + generateTableMetaList( + SqlBuilder.select( + TbAppFlinkJobConfig.ID_A, + TbAppFlinkJobConfig.NAME_A, + TbAppCollectTableInfo.ALIAS_A, + TbAppCollectTableInfo.SRC_SCHEMA_A, + TbAppCollectTableInfo.SRC_TABLE_A, TbAppCollectTableInfo.TGT_DB_A, TbAppCollectTableInfo.TGT_TABLE_A, - TbAppCollectTableInfo.TGT_TABLE_TYPE_A, - TbAppCollectTableInfo.TGT_HDFS_PATH_A, - TbAppHudiJobConfig.WRITE_TASKS_A, - TbAppHudiJobConfig.WRITE_OPERATION_A, - TbAppHudiJobConfig.WRITE_TASK_MAX_MEMORY_A, - TbAppHudiJobConfig.WRITE_BATCH_SIZE_A, - TbAppHudiJobConfig.WRITE_RATE_LIMIT_A, - TbAppCollectTableInfo.BUCKET_NUMBER_A, - TbAppHudiJobConfig.COMPACTION_STRATEGY_A, - TbAppHudiJobConfig.COMPACTION_TASKS_A, - TbAppHudiJobConfig.COMPACTION_DELTA_COMMITS_A, - TbAppHudiJobConfig.COMPACTION_DELTA_SECONDS_A, - TbAppHudiJobConfig.COMPACTION_ASYNC_ENABLED_A, - TbAppHudiJobConfig.COMPACTION_MAX_MEMORY_A, - TbAppHudiJobConfig.CONFIGS_A, - TbAppCollectTableInfo.FILTER_FIELD_A, - TbAppCollectTableInfo.FILTER_VALUES_A, - TbAppCollectTableInfo.FILTER_TYPE_A, - TbAppCollectTableInfo.SRC_TOPIC_A, - TbAppCollectTableInfo.SRC_PULSAR_ADDR_A, - Alias.of("tayjc_sync.job_manager_memory", "sync_job_manager_memory"), - Alias.of("tayjc_sync.task_manager_memory", "sync_task_manager_memory"), - Alias.of("tayjc_compaction.job_manager_memory", "compaction_job_manager_memory"), - Alias.of("tayjc_compaction.task_manager_memory", "compaction_task_manger_momory"), - TbAppCollectTableInfo.PARTITION_FIELD_A, - TbAppHudiSyncState.MESSAGE_ID_A, - TbAppGlobalConfig.METRIC_PUBLISH_URL_A, - TbAppGlobalConfig.METRIC_PROMETHEUS_URL_A, - TbAppGlobalConfig.METRIC_API_URL_A, - TbAppGlobalConfig.METRIC_PUBLISH_DELAY_A, - TbAppGlobalConfig.METRIC_PUBLISH_PERIOD_A, - TbAppGlobalConfig.METRIC_PUBLISH_TIMEOUT_A, - TbAppGlobalConfig.METRIC_PUBLISH_BATCH_A, - Alias.of(TbAppFlinkJobConfig.ID_A, "job_id"), - Alias.of(TbAppFlinkJobConfig.NAME_A, "job_name"), - TbAppGlobalConfig.CHECKPOINT_ROOT_PATH_A, - TbAppHudiJobConfig.SOURCE_TASKS_A, - TbAppCollectTableInfo.ALIAS_A, - SQLConstants.IapDatahub.DataSource.CONNECTION_A, - TbAppCollectTableInfo.PRIORITY_A, - SQLConstants.IapDatahub.DataSource.DS_TYPE_A, - TbAppHudiJobConfig.KEEP_FILE_VERSION_A, - TbAppHudiJobConfig.KEEP_COMMIT_VERSION_A, - TbAppCollectTableInfo.TAGS_A, - TbAppGlobalConfig.ZK_URL_A, - TbAppCollectTableInfo.VERSION_A, - SQLConstants.IapDatahub.DataSourceTableField.SCALE_A - ) - .from( - SQLConstants.IapDatahub.DataSource._alias_, - SQLConstants.IapDatahub.DataSourceTable._alias_, - SQLConstants.IapDatahub.DataSourceTableField._alias_, - TbAppFlinkJobConfig._alias_, - TbAppHudiJobConfig._alias_, - Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"), - Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"), - TbAppGlobalConfig._alias_, - TbAppCollectTableInfo._alias_, - TbAppHudiSyncState._alias_ - ) - .whereEq(SQLConstants.IapDatahub.DataSource.DS_ROLE_A, "src") - .andEq(SQLConstants.IapDatahub.DataSource.DS_STATE_A, STATUS_Y) - .andEq(SQLConstants.IapDatahub.DataSource.RECORD_STATE_A, STATUS_Y) - .andEq(SQLConstants.IapDatahub.DataSourceTable.DS_ID_A, Column.as(SQLConstants.IapDatahub.DataSource.DS_ID_A)) - .andEq(SQLConstants.IapDatahub.DataSourceTable.RECORD_STATE_A, STATUS_Y) - .andEq(SQLConstants.IapDatahub.DataSourceTableField.TABLE_ID_A, Column.as(SQLConstants.IapDatahub.DataSourceTable.TABLE_ID_A)) - .andEq(SQLConstants.IapDatahub.DataSourceTableField.RECORD_STATE_A, STATUS_Y) - .andIn(SQLConstants.IapDatahub.DataSource.DS_TYPE_A, "udal", "telepg") - .andEq(SQLConstants.IapDatahub.DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A)) - .andEq(SQLConstants.IapDatahub.DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A)) - .andEq(SQLConstants.IapDatahub.DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A)) - .andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) - .andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A)) - .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id")) - .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id")) - .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A)) - .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) - .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) - .andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) - .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) - .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) - .andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y) - .andEq("tayjc_sync.status", STATUS_Y) - .andEq("tayjc_compaction.status", STATUS_Y) - .orderBy(SQLConstants.IapDatahub.DataSourceTableField.FIELD_SEQ_A) + TbAppCollectTableInfo.TGT_HDFS_PATH_A + ), + flinkJobId, + alias + ) .build() )); }