refactor(info-query): 转换flink job查询旧逻辑到新逻辑
This commit is contained in:
@@ -11,6 +11,7 @@ import cn.hutool.core.util.StrUtil;
|
|||||||
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
|
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
|
||||||
import com.eshore.odcp.hudi.connector.entity.SyncState;
|
import com.eshore.odcp.hudi.connector.entity.SyncState;
|
||||||
import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
||||||
|
import com.eshore.odcp.hudi.connector.exception.FlinkJobNotFoundException;
|
||||||
import com.eshore.odcp.hudi.connector.exception.SyncStateNotFoundException;
|
import com.eshore.odcp.hudi.connector.exception.SyncStateNotFoundException;
|
||||||
import com.eshore.odcp.hudi.connector.utils.database.DatabaseService;
|
import com.eshore.odcp.hudi.connector.utils.database.DatabaseService;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
|
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
|
||||||
@@ -37,7 +38,7 @@ import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*;
|
|||||||
* @author lanyuanxiaoyao
|
* @author lanyuanxiaoyao
|
||||||
* @date 2023-04-24
|
* @date 2023-04-24
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("OptionalGetWithoutIsPresent")
|
@SuppressWarnings({"OptionalGetWithoutIsPresent", "SqlSourceToSinkFlow"})
|
||||||
@CacheConfig(cacheManager = "normal-cache")
|
@CacheConfig(cacheManager = "normal-cache")
|
||||||
@Service
|
@Service
|
||||||
public class InfoService {
|
public class InfoService {
|
||||||
@@ -260,19 +261,59 @@ public class InfoService {
|
|||||||
@Cacheable(value = "job-metas", sync = true)
|
@Cacheable(value = "job-metas", sync = true)
|
||||||
@Retryable(Throwable.class)
|
@Retryable(Throwable.class)
|
||||||
public ImmutableList<JobAndMetas> jobAndMetas() {
|
public ImmutableList<JobAndMetas> jobAndMetas() {
|
||||||
return databaseService.findAllFlinkJob().collect(job -> new JobAndMetas(job, databaseService.findTableMeta(job.getId())));
|
return flinkJobs()
|
||||||
|
.collect(job -> new JobAndMetas(job, databaseService.findTableMeta(job.getId())));
|
||||||
|
}
|
||||||
|
|
||||||
|
private ImmutableList<FlinkJob> flinkJobs(Long flinkJobId) {
|
||||||
|
return Lists.immutable.ofAll(
|
||||||
|
mysqlJdbcTemplate.query(
|
||||||
|
SqlBuilder.select(
|
||||||
|
TbAppFlinkJobConfig.ID_A,
|
||||||
|
TbAppFlinkJobConfig.NAME_A,
|
||||||
|
TbAppFlinkJobConfig.RUN_MODE_A,
|
||||||
|
TbAppYarnJobConfig.JOB_MANAGER_MEMORY_A,
|
||||||
|
TbAppYarnJobConfig.TASK_MANAGER_MEMORY_A
|
||||||
|
)
|
||||||
|
.from(TbAppFlinkJobConfig._alias_, TbAppYarnJobConfig._alias_)
|
||||||
|
.whereEq(TbAppFlinkJobConfig.ONE_IN_ONE_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfig.ID_A))
|
||||||
|
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
|
||||||
|
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
|
||||||
|
.build(),
|
||||||
|
(rs, row) -> {
|
||||||
|
String runModeText = rs.getString(3);
|
||||||
|
FlinkJob.RunMode mode;
|
||||||
|
try {
|
||||||
|
mode = FlinkJob.RunMode.valueOf(runModeText);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
mode = FlinkJob.RunMode.ALL_IN_ONE;
|
||||||
|
}
|
||||||
|
TableMeta.YarnMeta yarnMeta = TableMeta.YarnMeta.builder()
|
||||||
|
.jobManagerMemory(rs.getInt(4))
|
||||||
|
.taskManagerMemory(rs.getInt(5))
|
||||||
|
.build();
|
||||||
|
return FlinkJob.builder()
|
||||||
|
.id(rs.getLong(1))
|
||||||
|
.name(rs.getString(2))
|
||||||
|
.runMode(mode)
|
||||||
|
.oneInOneSyncYarn(yarnMeta)
|
||||||
|
.build();
|
||||||
|
})
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Cacheable(value = "flink-jobs", sync = true)
|
@Cacheable(value = "flink-jobs", sync = true)
|
||||||
@Retryable(Throwable.class)
|
@Retryable(Throwable.class)
|
||||||
public ImmutableList<FlinkJob> flinkJobs() {
|
public ImmutableList<FlinkJob> flinkJobs() {
|
||||||
return databaseService.findAllFlinkJob();
|
return flinkJobs(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Cacheable(value = "flink-jobs", sync = true, key = "#flinkJobId")
|
@Cacheable(value = "flink-jobs", sync = true, key = "#flinkJobId")
|
||||||
@Retryable(Throwable.class)
|
@Retryable(Throwable.class)
|
||||||
public FlinkJob flinkJob(Long flinkJobId) {
|
public FlinkJob flinkJob(Long flinkJobId) {
|
||||||
return databaseService.getFlinkJob(flinkJobId);
|
return flinkJobs(flinkJobId)
|
||||||
|
.getFirstOptional()
|
||||||
|
.orElseThrow(FlinkJobNotFoundException::new);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Cacheable(value = "table-metas", sync = true)
|
@Cacheable(value = "table-metas", sync = true)
|
||||||
|
|||||||
@@ -19,30 +19,15 @@ public class SqlBuilderTests {
|
|||||||
System.out.println(SqlUtil.formatSql(
|
System.out.println(SqlUtil.formatSql(
|
||||||
SqlBuilder.select(
|
SqlBuilder.select(
|
||||||
TbAppFlinkJobConfig.ID_A,
|
TbAppFlinkJobConfig.ID_A,
|
||||||
TbAppCollectTableInfo.ALIAS_A,
|
TbAppFlinkJobConfig.NAME_A,
|
||||||
TbAppHudiSyncState.MESSAGE_ID_A,
|
TbAppFlinkJobConfig.RUN_MODE_A,
|
||||||
TbAppHudiSyncState.SOURCE_START_TIME_A,
|
TbAppYarnJobConfig.JOB_MANAGER_MEMORY_A,
|
||||||
TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_A,
|
TbAppYarnJobConfig.TASK_MANAGER_MEMORY_A
|
||||||
TbAppHudiSyncState.SOURCE_PUBLISH_TIME_A,
|
|
||||||
TbAppHudiSyncState.SOURCE_OP_TIME_A,
|
|
||||||
TbAppHudiSyncState.COMPACTION_START_TIME_A,
|
|
||||||
TbAppHudiSyncState.COMPACTION_FINISH_TIME_A,
|
|
||||||
TbAppHudiSyncState.COMPACTION_APPLICATION_ID_A,
|
|
||||||
TbAppHudiSyncState.COMPACTION_STATUS_A,
|
|
||||||
TbAppHudiSyncState.COMPACTION_STATUS_TIME_A,
|
|
||||||
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_A
|
|
||||||
)
|
)
|
||||||
.from(
|
.from(TbAppFlinkJobConfig._alias_, TbAppYarnJobConfig._alias_)
|
||||||
TbAppFlinkJobConfig._alias_,
|
.whereEq(TbAppFlinkJobConfig.ONE_IN_ONE_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfig.ID_A))
|
||||||
TbAppCollectTableInfo._alias_,
|
|
||||||
TbAppHudiSyncState._alias_
|
|
||||||
)
|
|
||||||
.whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
|
|
||||||
.andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
|
|
||||||
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
|
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
|
||||||
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
|
|
||||||
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
|
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
|
||||||
.andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias)
|
|
||||||
.build()
|
.build()
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user