diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/ScheduleService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/ScheduleService.java index d379050..9ca497d 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/ScheduleService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/ScheduleService.java @@ -1,8 +1,11 @@ package com.lanyuanxiaoyao.service.forest.service; import com.dtflys.forest.annotation.BaseRequest; +import com.dtflys.forest.annotation.Body; import com.dtflys.forest.annotation.Get; +import com.dtflys.forest.annotation.Post; import com.dtflys.forest.annotation.Query; +import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.configuration.entity.schedule.ScheduleStrategy; import org.eclipse.collections.api.list.ImmutableList; @@ -36,6 +39,9 @@ public interface ScheduleService { @Get("/table") void scheduleTableRecommend(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("recommend") String recommendCluster); + @Post(value = "/tables", contentType = "application/json") + void scheduleTables(@Body ImmutableList list); + @Get("/stop_all") void stopAll(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("disable_meta") Boolean disableMeta); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/controller/ScheduleController.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/controller/ScheduleController.java index d3153bb..d1ba14f 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/controller/ScheduleController.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/controller/ScheduleController.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.lanyuanxiaoyao.service.common.Constants; +import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; import com.lanyuanxiaoyao.service.forest.service.HudiService; import com.lanyuanxiaoyao.service.forest.service.InfoService; @@ -34,6 +35,8 @@ import org.slf4j.LoggerFactory; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @@ -141,6 +144,22 @@ public class ScheduleController { ); } + @PostMapping("tables") + public void scheduleTables(@RequestBody ImmutableList list) { + if (list.isEmpty()) { + return; + } + ImmutableList keys = list.collect(item -> StrUtil.format("{}-{}", item.getId(), item.getAlias())); + ScheduleHelper.schedule( + discoveryClient, + infoService, + hudiService, + mapper, + meta -> keys.contains(StrUtil.format("{}-{}", meta.getJob().getId(), meta.getAlias())), + "Schedule manually" + ); + } + @GetMapping("stop_all") public void stopTheWorld( @RequestParam("flink_job_id") Long flinkJobId, diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DailyScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DailyScheduleJob.java index 254c216..d226007 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DailyScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DailyScheduleJob.java @@ -45,8 +45,7 @@ public class DailyScheduleJob extends BaseScheduleJob { hudiService, mapper, meta -> true, - comment, - Maps.immutable.empty() + comment ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java index 5ea0aad..cdc7837 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java @@ -45,8 +45,7 @@ public class FocusScheduleJob extends BaseScheduleJob { hudiService, mapper, meta -> meta.getPriority() >= 10000, - comment, - Maps.immutable.empty() + comment ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java index 2aac8a2..a8df519 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java @@ -49,8 +49,7 @@ public class FocusUnVersionUpdateScheduleJob extends BaseScheduleJob { mapper, meta -> meta.getPriority() >= 10000 && unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getJob().getId(), meta.getAlias())), - comment, - Maps.immutable.empty() + comment ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java index f976a88..b43feae 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java @@ -47,8 +47,7 @@ public class OdsFocusScheduleJob extends BaseScheduleJob { hudiService, mapper, meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_ODS_FOCUS), - comment, - Maps.immutable.empty() + comment ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java index c6b8539..b1f8d45 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java @@ -37,6 +37,17 @@ public class ScheduleHelper { return LabelMarker.of(() -> Maps.mutable.of(Constants.LOG_FLINK_JOB_ID, flinkJobId.toString(), Constants.LOG_ALIAS, alias)); } + public static void schedule( + DiscoveryClient discoveryClient, + InfoService infoService, + HudiService hudiService, + ObjectMapper mapper, + Predicate predicate, + String comment + ) { + schedule(discoveryClient, infoService, hudiService, mapper, predicate, comment, Maps.immutable.empty()); + } + public static void schedule( DiscoveryClient discoveryClient, InfoService infoService, diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/ScheduleController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/ScheduleController.java index 1303c60..270b308 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/ScheduleController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/ScheduleController.java @@ -47,7 +47,7 @@ public class ScheduleController extends BaseController { for (JobIdAndAlias jobIdAndAlias : jobIdAndAliases) { checkTableExists(infoService, jobIdAndAlias.getId(), jobIdAndAlias.getAlias()); } - jobIdAndAliases.forEach(item -> scheduleService.scheduleTable(item.getId(), item.getAlias())); + scheduleService.scheduleTables(jobIdAndAliases); return AmisResponse.responseSuccess(); }