From c2cdfac2690144dba95970629ae64c5d9042b853 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 27 Dec 2023 18:17:38 +0800 Subject: [PATCH] =?UTF-8?q?refactor(info-query):=20=E8=BD=AC=E6=8D=A2?= =?UTF-8?q?=E6=97=A7=E9=80=BB=E8=BE=91=E5=88=B0=E6=96=B0=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/info/service/InfoService.java | 52 ++++++- .../test/java/com/test/SqlBuilderTests.java | 128 +++--------------- 2 files changed, 71 insertions(+), 109 deletions(-) diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index 88c0657..3b1ca23 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -11,12 +11,14 @@ import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.entity.FlinkJob; import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.eshore.odcp.hudi.connector.exception.SyncStateNotFoundException; import com.eshore.odcp.hudi.connector.utils.database.DatabaseService; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.*; import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider; import java.sql.Timestamp; import java.util.List; +import java.util.function.Function; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.map.ImmutableMap; @@ -78,7 +80,55 @@ public class InfoService { @Cacheable(value = "sync-state", sync = true, key = "#flinkJobId.toString()+#alias") @Retryable(Throwable.class) public SyncState syncState(Long flinkJobId, String alias) { - return databaseService.getSyncState(flinkJobId, alias); + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + SqlBuilder.select( + TbAppFlinkJobConfig.ID_A, + TbAppCollectTableInfo.ALIAS_A, + TbAppHudiSyncState.MESSAGE_ID_A, + TbAppHudiSyncState.SOURCE_START_TIME_A, + TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_A, + TbAppHudiSyncState.SOURCE_PUBLISH_TIME_A, + TbAppHudiSyncState.SOURCE_OP_TIME_A, + TbAppHudiSyncState.COMPACTION_START_TIME_A, + TbAppHudiSyncState.COMPACTION_FINISH_TIME_A, + TbAppHudiSyncState.COMPACTION_APPLICATION_ID_A, + TbAppHudiSyncState.COMPACTION_STATUS_A, + TbAppHudiSyncState.COMPACTION_STATUS_TIME_A, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_A + ) + .from( + TbAppFlinkJobConfig._alias_, + TbAppCollectTableInfo._alias_, + TbAppHudiSyncState._alias_ + ) + .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) + .andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) + .build(), + (rs, row) -> { + Function dateConvertor = timestamp -> timestamp == null ? 0 : timestamp.getTime(); + return SyncState.builder() + .flinkJobId(rs.getLong(1)) + .alias(rs.getString(2)) + .messageId(rs.getString(3)) + .sourceStartTime(dateConvertor.apply(rs.getTimestamp(4))) + .sourceCheckpointTime(dateConvertor.apply(rs.getTimestamp(5))) + .sourcePublishTime(dateConvertor.apply(rs.getTimestamp(6))) + .sourceOperationTime(dateConvertor.apply(rs.getTimestamp(7))) + .compactionStartTime(dateConvertor.apply(rs.getTimestamp(8))) + .compactionFinishTime(dateConvertor.apply(rs.getTimestamp(9))) + .compactionApplicationId(rs.getString(10)) + .compactionStatus(rs.getString(11)) + .compactionStatusTime(dateConvertor.apply(rs.getTimestamp(12))) + .compactionLatestOperationTime(dateConvertor.apply(rs.getTimestamp(13))) + .build(); + } + ) + ).getFirstOptional().orElseThrow(SyncStateNotFoundException::new); } private SqlBuilder generateJobIdAndAliasCriteria( diff --git a/service-info-query/src/test/java/com/test/SqlBuilderTests.java b/service-info-query/src/test/java/com/test/SqlBuilderTests.java index 8b69f2a..784a8f8 100644 --- a/service-info-query/src/test/java/com/test/SqlBuilderTests.java +++ b/service-info-query/src/test/java/com/test/SqlBuilderTests.java @@ -14,123 +14,35 @@ import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; */ public class SqlBuilderTests { public static void main(String[] args) { - /*System.out.println(SqlFormatter.format( + Long flinkJobId = 100086L; + String alias = "hello_world"; + System.out.println(SqlUtil.formatSql( SqlBuilder.select( - DataSource.DS_NAME_A, - DataSource.SCHEMA_NAME_A, - DataSourceTable.TABLE_NAME_A, - DataSourceTable.TABLE_TYPE_A, - DataSourceTableField.FIELD_NAME_A, - DataSourceTableField.FIELD_SEQ_A, - DataSourceTableField.FIELD_TYPE_A, - DataSourceTableField.PRIMARY_KEY_A, - DataSourceTableField.PARTITION_KEY_A, - DataSourceTableField.LENGTH_A, - TbAppCollectTableInfo.TGT_DB_A, - TbAppCollectTableInfo.TGT_TABLE_A, - TbAppCollectTableInfo.TGT_TABLE_TYPE_A, - TbAppCollectTableInfo.TGT_HDFS_PATH_A, - TbAppHudiJobConfig.WRITE_TASKS_A, - TbAppHudiJobConfig.WRITE_OPERATION_A, - TbAppHudiJobConfig.WRITE_TASK_MAX_MEMORY_A, - TbAppHudiJobConfig.WRITE_BATCH_SIZE_A, - TbAppHudiJobConfig.WRITE_RATE_LIMIT_A, - TbAppCollectTableInfo.BUCKET_NUMBER_A, - TbAppHudiJobConfig.COMPACTION_STRATEGY_A, - TbAppHudiJobConfig.COMPACTION_TASKS_A, - TbAppHudiJobConfig.COMPACTION_DELTA_COMMITS_A, - TbAppHudiJobConfig.COMPACTION_DELTA_SECONDS_A, - TbAppHudiJobConfig.COMPACTION_ASYNC_ENABLED_A, - TbAppHudiJobConfig.COMPACTION_MAX_MEMORY_A, - TbAppHudiJobConfig.CONFIGS_A, - TbAppCollectTableInfo.FILTER_FIELD_A, - TbAppCollectTableInfo.FILTER_VALUES_A, - TbAppCollectTableInfo.FILTER_TYPE_A, - TbAppCollectTableInfo.SRC_TOPIC_A, - TbAppCollectTableInfo.SRC_PULSAR_ADDR_A, - Alias.of("sync_config." + TbAppYarnJobConfig.JOB_MANAGER_MEMORY_O, "sync_job_manager_memory"), - Alias.of("sync_config." + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_O, "sync_task_manager_memory"), - Alias.of("compaction_config." + TbAppYarnJobConfig.JOB_MANAGER_MEMORY_O, "compaction_job_manager_memory"), - Alias.of("compaction_config." + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_O, "compaction_task_manger_momory"), - TbAppCollectTableInfo.PARTITION_FIELD_A, - TbAppHudiSyncState.MESSAGE_ID_A, - TbAppGlobalConfig.METRIC_PUBLISH_URL_A, - TbAppGlobalConfig.METRIC_PROMETHEUS_URL_A, - TbAppGlobalConfig.METRIC_API_URL_A, - TbAppGlobalConfig.METRIC_PUBLISH_DELAY_A, - TbAppGlobalConfig.METRIC_PUBLISH_PERIOD_A, - TbAppGlobalConfig.METRIC_PUBLISH_TIMEOUT_A, - TbAppGlobalConfig.METRIC_PUBLISH_BATCH_A, - Alias.of(TbAppFlinkJobConfig.ID_A, "job_id"), - Alias.of(TbAppFlinkJobConfig.NAME_A, "job_name"), - TbAppGlobalConfig.CHECKPOINT_ROOT_PATH_A, - TbAppHudiJobConfig.SOURCE_TASKS_A, + TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A, - DataSource.CONNECTION_A, - TbAppCollectTableInfo.PRIORITY_A, - DataSource.DS_TYPE_A, - TbAppHudiJobConfig.KEEP_FILE_VERSION_A, - TbAppHudiJobConfig.KEEP_COMMIT_VERSION_A, - TbAppCollectTableInfo.TAGS_A, - TbAppGlobalConfig.ZK_URL_A + TbAppHudiSyncState.MESSAGE_ID_A, + TbAppHudiSyncState.SOURCE_START_TIME_A, + TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_A, + TbAppHudiSyncState.SOURCE_PUBLISH_TIME_A, + TbAppHudiSyncState.SOURCE_OP_TIME_A, + TbAppHudiSyncState.COMPACTION_START_TIME_A, + TbAppHudiSyncState.COMPACTION_FINISH_TIME_A, + TbAppHudiSyncState.COMPACTION_APPLICATION_ID_A, + TbAppHudiSyncState.COMPACTION_STATUS_A, + TbAppHudiSyncState.COMPACTION_STATUS_TIME_A, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_A ) .from( - DataSource._alias_, - DataSourceTable._alias_, - DataSourceTableField._alias_, TbAppFlinkJobConfig._alias_, - TbAppHudiJobConfig._alias_, - Alias.of(TbAppYarnJobConfig._origin_, "sync_config"), - Alias.of(TbAppYarnJobConfig._origin_, "compaction_config"), - TbAppGlobalConfig._alias_, - TbAppCollectTableInfo._alias_ + TbAppCollectTableInfo._alias_, + TbAppHudiSyncState._alias_ ) - .leftJoin(TbAppHudiSyncState._alias_) - .onEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A))) - .whereEq(DataSource.DS_ROLE_A, "src") - .andEq(DataSource.DS_STATE_A, "y") - .andEq(DataSource.RECORD_STATE_A, "y") - .andEq(DataSourceTable.DS_ID_A, Column.as(DataSource.DS_ID_A)) - .andEq(DataSourceTable.RECORD_STATE_A, "y") - .andEq(DataSourceTableField.TABLE_ID_A, Column.as(DataSourceTable.TABLE_ID_A)) - .andEq(DataSourceTableField.RECORD_STATE_A, "y") - .andIn(DataSource.DS_TYPE_A, "udal", "telepg") - .andEq(DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A)) - .andEq(DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A)) - .andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A)) - .andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) - .andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A)) - .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("sync_config." + TbAppYarnJobConfig.ID_O)) - .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("compaction_config." + TbAppYarnJobConfig.ID_O)) - .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A)) - .andEq(TbAppFlinkJobConfig.ID_A, 1542097984132706304L) - .andEq(TbAppCollectTableInfo.ALIAS_A, "crm_cfguse_channel") - .andEq(TbAppCollectTableInfo.STATUS_A, "y") - .andEq(TbAppFlinkJobConfig.STATUS_A, "y") - .andEq(TbAppHudiJobConfig.STATUS_A, "y") - .andEq("sync_config." + TbAppYarnJobConfig.STATUS_O, "y") - .andEq("compaction_config." + TbAppYarnJobConfig.STATUS_O, "y") - .orderBy(DataSourceTableField.FIELD_SEQ_A) - .build() - ));*/ - System.out.println(SqlUtil.formatSql( - SqlBuilder.select("*") - .from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_, TbAppCollectTableVersion._alias_) .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) + .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) .andEq(TbAppFlinkJobConfig.STATUS_A, "y") .andEq(TbAppCollectTableInfo.STATUS_A, "y") - .andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) - .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) - .andEq(TbAppCollectTableInfo.PRIORITY_A, 10000) - .andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false) - .andEq(TbAppCollectTableVersion.VERSION_A, "2018") - /*.join(TbAppCollectTableVersion._alias_) - .onEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A)) - .andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A)) - .whereLt(TbAppCollectTableInfo.PRIORITY_A, 10000) - .andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false) - .andEq(TbAppCollectTableVersion.VERSION_A, "2018") - .andEq(TbAppCollectTableInfo.STATUS_A, "y")*/ + .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) + .andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) .build() )); }