diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index 3b1ca23..4ab086b 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -11,6 +11,7 @@ import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.entity.FlinkJob; import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.eshore.odcp.hudi.connector.exception.FlinkJobNotFoundException; import com.eshore.odcp.hudi.connector.exception.SyncStateNotFoundException; import com.eshore.odcp.hudi.connector.utils.database.DatabaseService; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; @@ -37,7 +38,7 @@ import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; * @author lanyuanxiaoyao * @date 2023-04-24 */ -@SuppressWarnings("OptionalGetWithoutIsPresent") +@SuppressWarnings({"OptionalGetWithoutIsPresent", "SqlSourceToSinkFlow"}) @CacheConfig(cacheManager = "normal-cache") @Service public class InfoService { @@ -260,19 +261,59 @@ public class InfoService { @Cacheable(value = "job-metas", sync = true) @Retryable(Throwable.class) public ImmutableList jobAndMetas() { - return databaseService.findAllFlinkJob().collect(job -> new JobAndMetas(job, databaseService.findTableMeta(job.getId()))); + return flinkJobs() + .collect(job -> new JobAndMetas(job, databaseService.findTableMeta(job.getId()))); + } + + private ImmutableList flinkJobs(Long flinkJobId) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + SqlBuilder.select( + TbAppFlinkJobConfig.ID_A, + TbAppFlinkJobConfig.NAME_A, + TbAppFlinkJobConfig.RUN_MODE_A, + TbAppYarnJobConfig.JOB_MANAGER_MEMORY_A, + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_A + ) + .from(TbAppFlinkJobConfig._alias_, TbAppYarnJobConfig._alias_) + .whereEq(TbAppFlinkJobConfig.ONE_IN_ONE_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfig.ID_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) + .build(), + (rs, row) -> { + String runModeText = rs.getString(3); + FlinkJob.RunMode mode; + try { + mode = FlinkJob.RunMode.valueOf(runModeText); + } catch (IllegalArgumentException e) { + mode = FlinkJob.RunMode.ALL_IN_ONE; + } + TableMeta.YarnMeta yarnMeta = TableMeta.YarnMeta.builder() + .jobManagerMemory(rs.getInt(4)) + .taskManagerMemory(rs.getInt(5)) + .build(); + return FlinkJob.builder() + .id(rs.getLong(1)) + .name(rs.getString(2)) + .runMode(mode) + .oneInOneSyncYarn(yarnMeta) + .build(); + }) + ); } @Cacheable(value = "flink-jobs", sync = true) @Retryable(Throwable.class) public ImmutableList flinkJobs() { - return databaseService.findAllFlinkJob(); + return flinkJobs(null); } @Cacheable(value = "flink-jobs", sync = true, key = "#flinkJobId") @Retryable(Throwable.class) public FlinkJob flinkJob(Long flinkJobId) { - return databaseService.getFlinkJob(flinkJobId); + return flinkJobs(flinkJobId) + .getFirstOptional() + .orElseThrow(FlinkJobNotFoundException::new); } @Cacheable(value = "table-metas", sync = true) diff --git a/service-info-query/src/test/java/com/test/SqlBuilderTests.java b/service-info-query/src/test/java/com/test/SqlBuilderTests.java index 784a8f8..8307130 100644 --- a/service-info-query/src/test/java/com/test/SqlBuilderTests.java +++ b/service-info-query/src/test/java/com/test/SqlBuilderTests.java @@ -19,30 +19,15 @@ public class SqlBuilderTests { System.out.println(SqlUtil.formatSql( SqlBuilder.select( TbAppFlinkJobConfig.ID_A, - TbAppCollectTableInfo.ALIAS_A, - TbAppHudiSyncState.MESSAGE_ID_A, - TbAppHudiSyncState.SOURCE_START_TIME_A, - TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_A, - TbAppHudiSyncState.SOURCE_PUBLISH_TIME_A, - TbAppHudiSyncState.SOURCE_OP_TIME_A, - TbAppHudiSyncState.COMPACTION_START_TIME_A, - TbAppHudiSyncState.COMPACTION_FINISH_TIME_A, - TbAppHudiSyncState.COMPACTION_APPLICATION_ID_A, - TbAppHudiSyncState.COMPACTION_STATUS_A, - TbAppHudiSyncState.COMPACTION_STATUS_TIME_A, - TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_A + TbAppFlinkJobConfig.NAME_A, + TbAppFlinkJobConfig.RUN_MODE_A, + TbAppYarnJobConfig.JOB_MANAGER_MEMORY_A, + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_A ) - .from( - TbAppFlinkJobConfig._alias_, - TbAppCollectTableInfo._alias_, - TbAppHudiSyncState._alias_ - ) - .whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A)) - .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) + .from(TbAppFlinkJobConfig._alias_, TbAppYarnJobConfig._alias_) + .whereEq(TbAppFlinkJobConfig.ONE_IN_ONE_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfig.ID_A)) .andEq(TbAppFlinkJobConfig.STATUS_A, "y") - .andEq(TbAppCollectTableInfo.STATUS_A, "y") .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) - .andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias) .build() )); }