diff --git a/bin/build-scheduler.sh b/bin/build-scheduler.sh new file mode 100755 index 0000000..05631eb --- /dev/null +++ b/bin/build-scheduler.sh @@ -0,0 +1,4 @@ +#!/bin/bash +mvn -pl service-dependencies,service-configuration clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml +mvn -pl service-scheduler clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml +ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-scheduler/target/service-scheduler-1.0.0-SNAPSHOT.jar diff --git a/pom.xml b/pom.xml index 93dd0aa..64a28c3 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,7 @@ service-exporter service-check service-api + service-scheduler diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/utils/QueueUtil.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/utils/QueueUtil.java new file mode 100644 index 0000000..0868fcb --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/utils/QueueUtil.java @@ -0,0 +1,197 @@ +package com.lanyuanxiaoyao.service.configuration.utils; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.URLUtil; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.HttpUtil; +import com.eshore.odcp.hudi.connector.Constants; +import com.eshore.odcp.hudi.connector.entity.compaction.ScheduleJob; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import java.time.Duration; +import java.util.List; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.discovery.DiscoveryClient; + +/** + * 队列操作 + * + * @author lanyuanxiaoyao + * @date 2023-05-08 + */ +public class QueueUtil { + private static final Logger logger = LoggerFactory.getLogger(QueueUtil.class); + private static final RetryPolicy RETRY = RetryPolicy.builder() + .handle(Throwable.class) + .withDelay(Duration.ofSeconds(5)) + .withMaxAttempts(12) + .build(); + + private static String getQueueUrl(DiscoveryClient client) { + List instances = client.getInstances("service-queue"); + if (ObjectUtil.isEmpty(instances)) { + logger.info("Current instances: {}", String.join(",", client.getServices())); + throw new RuntimeException("Queue instance not found"); + } + return instances.get(0).getUri().toString(); + } + + public static ImmutableList names(DiscoveryClient client, ObjectMapper mapper) { + return Failsafe.with(RETRY) + .get(() -> { + try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(getQueueUrl(client), "/names")) + .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) + .execute()) { + if (response.isOk()) { + return mapper.readValue(response.body(), new TypeReference>() { + }); + } + return Lists.immutable.empty(); + } + }); + } + + public static Boolean add(DiscoveryClient client, ObjectMapper mapper, String name, QueueItem job) { + try { + if (ObjectUtil.isNull(job)) { + logger.warn("Job cannot be null"); + return false; + } + String body = mapper.writeValueAsString(job); + if (StrUtil.isBlank(body)) { + logger.error("Body cannot be blank"); + return false; + } + return Failsafe.with(RETRY) + .get(() -> { + try (HttpResponse response = HttpUtil.createPost(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/add?name={}", name))) + .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) + .body(body) + .execute()) { + if (response.isOk()) { + return Boolean.valueOf(response.body()); + } + return false; + } + }); + } catch (Throwable e) { + logger.error("Add job to queue error", e); + } + return false; + } + + public static QueueItem poll(DiscoveryClient client, ObjectMapper mapper, String name) { + try { + return Failsafe.with(RETRY) + .get(() -> { + try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/poll?name={}", name))) + .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) + .execute()) { + if (response.isOk()) { + String body = response.body(); + if (StrUtil.isNotBlank(body)) { + try { + return mapper.readValue(body, new TypeReference>() { + }); + } catch (Throwable error) { + logger.error("Schedule job parse error", error); + } + } + } + return null; + } + }); + } catch (Throwable e) { + logger.error("Poll job from queue error", e); + } + return null; + } + + public static Boolean isEmpty(DiscoveryClient client, String name) { + try { + try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/is_empty?name={}", name))) + .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) + .execute()) { + if (response.isOk()) { + String body = response.body(); + if (StrUtil.isNotBlank(body)) { + return Boolean.valueOf(body); + } + } + return true; + } + } catch (Throwable e) { + logger.error("Query empty failure", e); + } + return true; + } + + public static Integer size(DiscoveryClient client, String name) { + try { + try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/size?name={}", name))) + .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) + .execute()) { + if (response.isOk()) { + String body = response.body(); + if (StrUtil.isNotBlank(body)) { + return Integer.valueOf(body); + } + } + return -1; + } + } catch (Throwable e) { + logger.error("Query empty failure", e); + } + return -1; + } + + public static Boolean remove(DiscoveryClient client, String name, String id) { + try { + try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/remove_all_id?name={}&id={}", name, id))) + .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) + .execute()) { + if (response.isOk()) { + return Boolean.valueOf(response.body()); + } + return true; + } + } catch (Throwable e) { + logger.error(StrUtil.format("Remove {} failure", id), e); + } + return false; + } + + public static Boolean remove(DiscoveryClient client, ObjectMapper mapper, String name, ImmutableList ids) { + try { + String body = mapper.writeValueAsString(ids); + if (StrUtil.isBlank(body)) { + logger.error("Body cannot be blank"); + return false; + } + try (HttpResponse response = HttpUtil.createPost(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/remove_all_id?name={}", name))) + .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) + .body(body) + .execute()) { + if (response.isOk()) { + return Boolean.valueOf(response.body()); + } + return true; + } + } catch (Throwable e) { + logger.error(StrUtil.format("Remove {} failure", ids), e); + } + return false; + } + + public interface Function { + T apply(F f) throws Throwable; + } +} diff --git a/service-scheduler/pom.xml b/service-scheduler/pom.xml new file mode 100644 index 0000000..fa1e251 --- /dev/null +++ b/service-scheduler/pom.xml @@ -0,0 +1,40 @@ + + + 4.0.0 + + com.lanyuanxiaoyao + hudi-service + 1.0.0-SNAPSHOT + + + service-scheduler + + + + org.springframework.boot + spring-boot-starter-quartz + + + com.lanyuanxiaoyao + service-forest + 1.0.0-SNAPSHOT + + + com.cronutils + cron-utils + 9.2.0 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + \ No newline at end of file diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/SchedulerApplication.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/SchedulerApplication.java new file mode 100644 index 0000000..f1b8331 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/SchedulerApplication.java @@ -0,0 +1,72 @@ +package com.lanyuanxiaoyao.service.scheduler; + +import cn.hutool.core.thread.ThreadUtil; +import com.lanyuanxiaoyao.service.scheduler.strategy.ScheduleStrategy; +import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties; +import org.eclipse.collections.api.list.ImmutableList; +import org.quartz.Scheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.ComponentScans; + +/** + * 启动类 + * + * @author ZhangJiacheng + * @date 2023-05-11 + */ +@EnableDiscoveryClient +@SpringBootApplication(exclude = {GsonAutoConfiguration.class}) +@ComponentScans({ + @ComponentScan("com.lanyuanxiaoyao.service"), +}) +@EnableConfigurationProperties +@EnableEncryptableProperties +public class SchedulerApplication implements ApplicationRunner { + private static final Logger logger = LoggerFactory.getLogger(SchedulerApplication.class); + private final DiscoveryClient client; + private final Scheduler quartzScheduler; + private final ImmutableList strategies; + @Value("spring.application.name") + private String application; + + public SchedulerApplication(DiscoveryClient client, Scheduler quartzScheduler, ImmutableList strategies) { + this.client = client; + this.quartzScheduler = quartzScheduler; + this.strategies = strategies; + } + + public static void main(String[] args) { + SpringApplication.run(SchedulerApplication.class, args); + } + + @Override + public void run(ApplicationArguments args) throws Exception { + // 队列不支持分布式启动,只能启动一个 + if (client.getServices().contains(application)) { + logger.error("Found " + application + " exists, exit..."); + logger.warn("Wait for 15s and exit"); + ThreadUtil.safeSleep(15000); + System.exit(1); + } + + for (ScheduleStrategy strategy : strategies) { + if (strategy.enable()) { + logger.info("Schedule {} ({})", strategy.name(), strategy.comment()); + quartzScheduler.scheduleJob(strategy.job(), strategy.trigger()); + } else { + logger.warn("Strategy {} ({}) is disabled", strategy.name(), strategy.comment()); + } + } + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/configuration/ScheduleStrategyProvider.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/configuration/ScheduleStrategyProvider.java new file mode 100644 index 0000000..2f35dc9 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/configuration/ScheduleStrategyProvider.java @@ -0,0 +1,40 @@ +package com.lanyuanxiaoyao.service.scheduler.configuration; + +import com.lanyuanxiaoyao.service.scheduler.quartz.compaction.*; +import com.lanyuanxiaoyao.service.scheduler.strategy.ScheduleStrategy; +import com.lanyuanxiaoyao.service.scheduler.strategy.ScheduleStrategyImpl; +import com.lanyuanxiaoyao.service.scheduler.strategy.ScheduleStrategyVO; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author ZhangJiacheng + * @date 2023-06-06 + */ +@Configuration +public class ScheduleStrategyProvider { + @Bean + public ImmutableList strategies() { + return Lists.immutable.of( + ScheduleStrategyImpl.simple(false, "distribute_schedule", "定时分布式调度", DistributeScheduleJob.class, "0/2 * * * * ?"), + // 普通调度 + ScheduleStrategyImpl.simple("daily_schedule", "普通全表调度", DailyScheduleJob.class, "0 50 1,4,7,10,13,16,19 * * ?"), + // 重点表调度 + ScheduleStrategyImpl.simple("focus_evening_schedule", "晚间重点表调度", FocusScheduleJob.class, "0 50 20,21,22 * * ?"), + // ODS重点表调度 + ScheduleStrategyImpl.simple("ods_focus_schedule", "ODS 重点表调度", OdsFocusScheduleJob.class, "0 30 23 * * ?"), + // 忙时调度 + ScheduleStrategyImpl.simple("focus_schedule", "重点表跨天调度", FocusUnVersionUpdateScheduleJob.class, "0 0,10,20,40 0,1 * * ?"), + ScheduleStrategyImpl.simple(false, "remove_scheduled", "跨天调度时及时删除已完成跨天的任务", RemoveScheduledJob.class, "0 * 0,1 * * ?") + ); + } + + @Bean + public ImmutableList strategyVOS(ImmutableList strategies) { + return strategies + .select(ScheduleStrategy::enable) + .collect(ScheduleStrategyVO::new); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/controller/ScheduleController.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/controller/ScheduleController.java new file mode 100644 index 0000000..c09cd76 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/controller/ScheduleController.java @@ -0,0 +1,155 @@ +package com.lanyuanxiaoyao.service.scheduler.controller; + +import cn.hutool.core.util.NumberUtil; +import cn.hutool.core.util.StrUtil; +import com.cronutils.model.Cron; +import com.cronutils.model.CronType; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.model.time.ExecutionTime; +import com.cronutils.parser.CronParser; +import com.eshore.odcp.hudi.connector.Constants; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; +import com.lanyuanxiaoyao.service.forest.service.HudiService; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.forest.service.launcher.LaunchersService; +import com.lanyuanxiaoyao.service.scheduler.quartz.compaction.DailyScheduleJob; +import com.lanyuanxiaoyao.service.scheduler.quartz.compaction.FocusScheduleJob; +import com.lanyuanxiaoyao.service.scheduler.quartz.compaction.FocusUnVersionUpdateScheduleJob; +import com.lanyuanxiaoyao.service.scheduler.strategy.ScheduleStrategyVO; +import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Optional; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.list.MutableList; +import org.eclipse.collections.api.map.MutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * 调度器对外接口 + * + * @author ZhangJiacheng + * @date 2023-05-12 + */ +@RestController +@RequestMapping("schedule") +public class ScheduleController { + private static final Logger logger = LoggerFactory.getLogger(ScheduleController.class); + + private final DiscoveryClient discoveryClient; + private final InfoService infoService; + private final HudiService hudiService; + private final ImmutableList strategyVOS; + private final LaunchersService launchersService; + private final ObjectMapper mapper; + private final LoadingCache> scheduleJobTimeCache = Caffeine.newBuilder() + .expireAfterWrite(Duration.of(10, ChronoUnit.MINUTES)) + .build(cronExpression -> { + MutableList times = Lists.mutable.empty(); + CronParser cronParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)); + ZonedDateTime now = ZonedDateTime.now().withHour(0).withMinute(0).withSecond(0).withNano(0); + ZonedDateTime night = now.plusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0); + Cron cron = cronParser.parse(cronExpression); + ExecutionTime executionTime = ExecutionTime.forCron(cron); + Optional zonedDateTime = executionTime.nextExecution(now); + while (zonedDateTime.isPresent() && zonedDateTime.get().isBefore(night)) { + times.add(zonedDateTime.get()); + zonedDateTime = executionTime.nextExecution(zonedDateTime.get()); + } + return times.toImmutable(); + }); + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public ScheduleController(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ImmutableList strategyVOS, LaunchersService launchersService, Jackson2ObjectMapperBuilder builder) { + this.discoveryClient = discoveryClient; + this.infoService = infoService; + this.hudiService = hudiService; + this.strategyVOS = strategyVOS; + this.launchersService = launchersService; + + this.mapper = builder.build(); + } + + @GetMapping("schedule_jobs") + public ImmutableList scheduleJobs() { + return strategyVOS; + } + + @GetMapping("schedule_times") + public ImmutableList scheduleJobTimes() { + /*return strategies + .select(ScheduleStrategy::enable) + .select(ScheduleStrategy::show) + .select(strategy -> (strategy.trigger()) instanceof CronTrigger) + .flatCollect(strategy -> scheduleJobTimeCache.get(((CronTrigger) strategy.trigger()).getCronExpression())) + .toSortedList(ZonedDateTime::compareTo) + .collect(time -> time.toInstant().toEpochMilli()) + .toImmutable();*/ + return Lists.immutable.empty(); + } + + @GetMapping("all") + public void scheduleAllTable() { + DailyScheduleJob.schedule(discoveryClient, infoService, hudiService, mapper, "Daily schedule manually"); + } + + @GetMapping("all_focus") + public void scheduleAllFocus() { + FocusScheduleJob.schedule(discoveryClient, infoService, hudiService, mapper, "Focus schedule manually"); + } + + @GetMapping("all_un_scheduled") + public void scheduleAllUnScheduledTable() { + FocusUnVersionUpdateScheduleJob.schedule(discoveryClient, infoService, hudiService, mapper, "Un-updated version schedule manually"); + } + + @GetMapping("table") + public void scheduleTable( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam(value = "force", required = false) String forceCluster, + @RequestParam(value = "recommend", required = false) String recommendCluster + ) { + MutableMap metadata = Maps.mutable.empty(); + if (StrUtil.isNotBlank(forceCluster)) { + metadata.put(Constants.SCHEDULE_FORCE, forceCluster); + } + if (StrUtil.isNotBlank(recommendCluster)) { + metadata.put(Constants.SCHEDULE_RECOMMEND, recommendCluster); + } + ScheduleHelper.schedule( + discoveryClient, + infoService, + hudiService, + mapper, + meta -> StrUtil.equals(meta.getAlias(), alias) && NumberUtil.equals(meta.getJob().getId(), flinkJobId), + "Schedule manually", + metadata.toImmutable() + ); + } + + @GetMapping("stop_all") + public void stopTheWorld( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam(value = "disable_meta", defaultValue = "true") Boolean disableMeta + ) { + QueueUtil.remove(discoveryClient, Constants.COMPACTION_QUEUE_PRE, StrUtil.format("{}-{}", flinkJobId, alias)); + launchersService.compactionStop(flinkJobId, alias, disableMeta); + // 再删除一次,避免时间差导致任务重新放回预调度队列 + QueueUtil.remove(discoveryClient, Constants.COMPACTION_QUEUE_PRE, StrUtil.format("{}-{}", flinkJobId, alias)); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/BaseScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/BaseScheduleJob.java new file mode 100644 index 0000000..7872dd6 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/BaseScheduleJob.java @@ -0,0 +1,15 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.quartz.QuartzJobBean; + +/** + * 一些通用方法 + * + * @author ZhangJiacheng + * @date 2023-05-11 + */ +public abstract class BaseScheduleJob extends QuartzJobBean { + private static final Logger logger = LoggerFactory.getLogger(BaseScheduleJob.class); +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DailyScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DailyScheduleJob.java new file mode 100644 index 0000000..254c216 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DailyScheduleJob.java @@ -0,0 +1,58 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.forest.service.HudiService; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import org.eclipse.collections.api.factory.Maps; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; + +/** + * 日常普通调度 + * + * @author ZhangJiacheng + * @date 2023-05-11 + */ +@DisallowConcurrentExecution +public class DailyScheduleJob extends BaseScheduleJob { + private static final Logger logger = LoggerFactory.getLogger(DailyScheduleJob.class); + + private final DiscoveryClient discoveryClient; + private final InfoService infoService; + private final HudiService hudiService; + private final ObjectMapper mapper; + + public DailyScheduleJob(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, Jackson2ObjectMapperBuilder builder) { + this.discoveryClient = discoveryClient; + this.infoService = infoService; + this.hudiService = hudiService; + + this.mapper = builder.build(); + } + + public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) { + logger.info("Daily schedule"); + ScheduleHelper.schedule( + discoveryClient, + infoService, + hudiService, + mapper, + meta -> true, + comment, + Maps.immutable.empty() + ); + } + + @Override + protected void executeInternal(JobExecutionContext context) { + String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + schedule(discoveryClient, infoService, hudiService, mapper, "Daily schedule for " + now); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DistributeScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DistributeScheduleJob.java new file mode 100644 index 0000000..c5ab626 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DistributeScheduleJob.java @@ -0,0 +1,127 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; + +import cn.hutool.core.thread.ThreadUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.Constants; +import com.eshore.odcp.hudi.connector.entity.compaction.ScheduleJob; +import com.eshore.odcp.hudi.connector.utils.LogHelper; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnClusters; +import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster.Cluster; +import java.util.Optional; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.list.MutableList; +import org.eclipse.collections.api.map.ImmutableMap; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.context.ApplicationContext; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; + +/** + * 分发压缩任务 + * + * @author lanyuanxiaoyao + * @date 2023-05-25 + */ +@DisallowConcurrentExecution +public class DistributeScheduleJob extends BaseScheduleJob { + private static final Logger logger = LoggerFactory.getLogger(DistributeScheduleJob.class); + private static final String DISTRIBUTE_SCHEDULE_RETRY = "distribute_schedule_retry"; + + private final DiscoveryClient discoveryClient; + private final ObjectMapper mapper; + private final ImmutableList clusters; + private final String defaultCluster; + + public DistributeScheduleJob(DiscoveryClient discoveryClient, YarnClusters yarnClusters, ApplicationContext applicationContext, Jackson2ObjectMapperBuilder builder) { + this.discoveryClient = discoveryClient; + + this.mapper = builder.build(); + MutableList list = Lists.mutable.empty(); + for (String cluster : yarnClusters.getActiveClusters()) { + list.add(applicationContext.getBean(StrUtil.format("{}Cluster", StrUtil.toCamelCase(cluster)), Cluster.class)); + } + this.clusters = list.toImmutable(); + this.defaultCluster = yarnClusters.getDefaultSyncCluster(); + } + + private String getAvailableQueue(QueueItem item) { + ImmutableMap metadata = Maps.immutable.ofAll(item.getMetadata()); + if (metadata.containsKey(Constants.SCHEDULE_FORCE)) { + String forceCluster = metadata.get(Constants.SCHEDULE_FORCE); + Optional cluster = clusters.select(s -> StrUtil.equals(forceCluster, s.cluster())).getFirstOptional(); + if (cluster.isPresent()) { + return cluster.get().queue(); + } else { + logger.warn(StrUtil.format("{} cluster not found", forceCluster)); + } + } + if (metadata.containsKey(Constants.SCHEDULE_RECOMMEND)) { + String recommendCluster = metadata.get(Constants.SCHEDULE_RECOMMEND); + Optional cluster = clusters.select(s -> StrUtil.equals(recommendCluster, s.cluster())).getFirstOptional(); + if (cluster.isPresent() && cluster.get().available()) { + return cluster.get().queue(); + } else { + logger.warn(StrUtil.format("{} cluster not found or busy")); + } + } + for (Cluster cluster : clusters) { + if (cluster.available()) { + return cluster.queue(); + } + } + return null; + } + + @Override + @SuppressWarnings("DataFlowIssue") + protected void executeInternal(JobExecutionContext context) { + QueueItem item = QueueUtil.poll(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE); + if (ObjectUtil.isNotNull(item)) { + LogHelper.setMdc(Constants.LOG_JOB_ID_LABEL, item.getTraceId()); + if (ObjectUtil.isNotNull(item.getData())) { + ScheduleJob data = item.getData(); + LogHelper.setMdcFlinkJobAndAlias(data.getFlinkJobId(), data.getAlias()); + } + try { + String availableQueue = getAvailableQueue(item); + // 如果没有获取到可用的调度集群 + if (ObjectUtil.isNull(availableQueue)) { + // 获取任务里的调度重试次数 + long retryTime = item.getMetadata().containsKey(DISTRIBUTE_SCHEDULE_RETRY) ? Long.parseLong(item.getMetadata(DISTRIBUTE_SCHEDULE_RETRY)) : 0L; + // 如果重试次数小于6 + if (retryTime < 6) { + // 重试次数加1 + item.getMetadata().put(DISTRIBUTE_SCHEDULE_RETRY, String.valueOf(retryTime + 1)); + // 暂停15秒 + ThreadUtil.safeSleep(15 * Constants.SECOND); + // 重新放回pre队列 + QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item); + } + // 如果重试次数大于6 + else { + logger.warn("Cannot found available cluster six time for item " + item.getId()); + // 移除重试字段,因为已经交给保底队列,如果保底队列再次把任务放回pre队列,那就当作全新的任务重试计数 + item.getMetadata().remove(DISTRIBUTE_SCHEDULE_RETRY); + QueueUtil.add(discoveryClient, this.mapper, this.defaultCluster, item); + } + } else { + QueueUtil.add(discoveryClient, this.mapper, availableQueue, item); + } + } catch (Throwable e) { + logger.error("Distribute error", e); + QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item); + } finally { + LogHelper.removeMdc(Constants.LOG_JOB_ID_LABEL, Constants.LOG_FLINK_JOB_ID_LABEL, Constants.LOG_ALIAS_LABEL); + } + } + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java new file mode 100644 index 0000000..5ea0aad --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java @@ -0,0 +1,58 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.forest.service.HudiService; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import org.eclipse.collections.api.factory.Maps; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; + +/** + * 日常普通调度 + * + * @author ZhangJiacheng + * @date 2023-05-11 + */ +@DisallowConcurrentExecution +public class FocusScheduleJob extends BaseScheduleJob { + private static final Logger logger = LoggerFactory.getLogger(FocusScheduleJob.class); + + private final DiscoveryClient discoveryClient; + private final InfoService infoService; + private final HudiService hudiService; + private final ObjectMapper mapper; + + public FocusScheduleJob(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, Jackson2ObjectMapperBuilder builder) { + this.discoveryClient = discoveryClient; + this.infoService = infoService; + this.hudiService = hudiService; + + this.mapper = builder.build(); + } + + public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) { + logger.info("Focus schedule"); + ScheduleHelper.schedule( + discoveryClient, + infoService, + hudiService, + mapper, + meta -> meta.getPriority() >= 10000, + comment, + Maps.immutable.empty() + ); + } + + @Override + protected void executeInternal(JobExecutionContext context) { + String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + schedule(discoveryClient, infoService, hudiService, mapper, "Focus schedule for " + now); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java new file mode 100644 index 0000000..2aac8a2 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java @@ -0,0 +1,62 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; + +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.forest.service.HudiService; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.list.ImmutableList; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; + +/** + * 日常普通调度 + * + * @author ZhangJiacheng + * @date 2023-05-11 + */ +@DisallowConcurrentExecution +public class FocusUnVersionUpdateScheduleJob extends BaseScheduleJob { + private static final Logger logger = LoggerFactory.getLogger(FocusUnVersionUpdateScheduleJob.class); + + private final DiscoveryClient discoveryClient; + private final InfoService infoService; + private final HudiService hudiService; + private final ObjectMapper mapper; + + public FocusUnVersionUpdateScheduleJob(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, Jackson2ObjectMapperBuilder builder) { + this.discoveryClient = discoveryClient; + this.infoService = infoService; + this.hudiService = hudiService; + + this.mapper = builder.build(); + } + + public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) { + logger.info("Focus un update version schedule"); + ImmutableList unUpdateVersionTableIds = infoService.nonUpdatedVersionTables(); + ScheduleHelper.schedule( + discoveryClient, + infoService, + hudiService, + mapper, + meta -> meta.getPriority() >= 10000 + && unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getJob().getId(), meta.getAlias())), + comment, + Maps.immutable.empty() + ); + } + + @Override + protected void executeInternal(JobExecutionContext context) { + String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + schedule(discoveryClient, infoService, hudiService, mapper, "Focus schedule for " + now); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java new file mode 100644 index 0000000..7040855 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java @@ -0,0 +1,60 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; + +import com.eshore.odcp.hudi.connector.Constants; +import com.eshore.odcp.hudi.connector.utils.TableMetaHelper; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.forest.service.HudiService; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import org.eclipse.collections.api.factory.Maps; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; + +/** + * 日常普通调度 + * + * @author ZhangJiacheng + * @date 2023-05-11 + */ +@DisallowConcurrentExecution +public class OdsFocusScheduleJob extends BaseScheduleJob { + private static final Logger logger = LoggerFactory.getLogger(OdsFocusScheduleJob.class); + + private final DiscoveryClient discoveryClient; + private final InfoService infoService; + private final HudiService hudiService; + private final ObjectMapper mapper; + + public OdsFocusScheduleJob(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, Jackson2ObjectMapperBuilder builder) { + this.discoveryClient = discoveryClient; + this.infoService = infoService; + this.hudiService = hudiService; + + this.mapper = builder.build(); + } + + public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) { + logger.info("Ods focus schedule"); + ScheduleHelper.schedule( + discoveryClient, + infoService, + hudiService, + mapper, + meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_ODS_FOCUS), + comment, + Maps.immutable.empty() + ); + } + + @Override + protected void executeInternal(JobExecutionContext context) { + String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + schedule(discoveryClient, infoService, hudiService, mapper, "Ods focus schedule for " + now); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/RemoveScheduledJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/RemoveScheduledJob.java new file mode 100644 index 0000000..39ae686 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/RemoveScheduledJob.java @@ -0,0 +1,39 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; + +import com.eshore.odcp.hudi.connector.Constants; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; + +/** + * 移除已完成表的跨天调度 + * + * @author ZhangJiacheng + * @date 2023-06-06 + */ +@DisallowConcurrentExecution +public class RemoveScheduledJob extends BaseScheduleJob { + private static final Logger logger = LoggerFactory.getLogger(RemoveScheduledJob.class); + + private final DiscoveryClient discoveryClient; + private final InfoService infoService; + private final ObjectMapper mapper; + + public RemoveScheduledJob(DiscoveryClient discoveryClient, InfoService infoService, Jackson2ObjectMapperBuilder builder) { + this.discoveryClient = discoveryClient; + this.infoService = infoService; + + this.mapper = builder.build(); + } + + @Override + protected void executeInternal(JobExecutionContext context) { + QueueUtil.remove(discoveryClient, mapper, Constants.COMPACTION_QUEUE_PRE, infoService.updatedVersionTables()); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/A4Cluster.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/A4Cluster.java new file mode 100644 index 0000000..81c2940 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/A4Cluster.java @@ -0,0 +1,30 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster; + +import com.eshore.odcp.hudi.connector.Constants; +import com.lanyuanxiaoyao.service.forest.service.YarnService; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.stereotype.Component; + +/** + * A4 + * + * @author lanyuanxiaoyao + * @date 2023-06-08 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@Component +public class A4Cluster extends Cluster { + public A4Cluster(DiscoveryClient client, YarnService yarnService) { + super( + Constants.CLUSTER_A4, + Constants.COMPACTION_QUEUE_A4, + AvailableStrategy.and( + new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_A4, 10), + new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_A4, "ten_iap.datalake", 0.8) + ) + ); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B12Cluster.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B12Cluster.java new file mode 100644 index 0000000..3fbd8b5 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B12Cluster.java @@ -0,0 +1,30 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster; + +import com.eshore.odcp.hudi.connector.Constants; +import com.lanyuanxiaoyao.service.forest.service.YarnService; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.stereotype.Component; + +/** + * A4 + * + * @author lanyuanxiaoyao + * @date 2023-06-08 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@Component +public class B12Cluster extends Cluster { + public B12Cluster(DiscoveryClient client, YarnService yarnService) { + super( + Constants.CLUSTER_B12, + Constants.COMPACTION_QUEUE_B12, + AvailableStrategy.and( + new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_B12, 20), + new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_B12, "default", 0.8) + ) + ); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B1Cluster.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B1Cluster.java new file mode 100644 index 0000000..9b486b1 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B1Cluster.java @@ -0,0 +1,30 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster; + +import com.eshore.odcp.hudi.connector.Constants; +import com.lanyuanxiaoyao.service.forest.service.YarnService; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.stereotype.Component; + +/** + * B1 + * + * @author lanyuanxiaoyao + * @date 2023-06-08 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@Component +public class B1Cluster extends Cluster { + public B1Cluster(DiscoveryClient client, YarnService yarnService) { + super( + Constants.CLUSTER_B1, + Constants.COMPACTION_QUEUE_B1, + AvailableStrategy.and( + new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_B1, 20), + new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_B1, "datalake", 1.0) + ) + ); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B5Cluster.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B5Cluster.java new file mode 100644 index 0000000..5151ea2 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B5Cluster.java @@ -0,0 +1,36 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster; + +import com.eshore.odcp.hudi.connector.Constants; +import com.lanyuanxiaoyao.service.forest.service.YarnService; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.stereotype.Component; + +/** + * B5 + * + * @author lanyuanxiaoyao + * @date 2023-06-08 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@Component +public class B5Cluster extends Cluster { + public B5Cluster(DiscoveryClient client, YarnService yarnService) { + super( + Constants.CLUSTER_B5, + Constants.COMPACTION_QUEUE_B5, + AvailableStrategy.and( + new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_B5, 10), + new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_B5, "ten_iap.datalake", 0.9) + /*AvailableStrategy.whether( + new DatetimeLimit("* * 0-1 * * ?"), + new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_B5, "ten_iap.datalake", 0.9), + new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_B5, "ten_iap.datalake", 0.5) + + )*/ + ) + ); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/Cluster.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/Cluster.java new file mode 100644 index 0000000..6b3c513 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/Cluster.java @@ -0,0 +1,52 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster; + +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 集群 + * + * @author ZhangJiacheng + * @date 2023-06-08 + */ +public class Cluster { + private static final Logger logger = LoggerFactory.getLogger(Cluster.class); + + private final String cluster; + private final String queue; + private final AvailableStrategy availableStrategy; + + public Cluster(String cluster, String queue, AvailableStrategy availableStrategy) { + this.cluster = cluster; + this.queue = queue; + this.availableStrategy = availableStrategy; + } + + public String cluster() { + return cluster; + } + + public String queue() { + return queue; + } + + public boolean available() { + try { + return availableStrategy.available(); + } catch (Throwable throwable) { + logger.error(StrUtil.format("Check cluster {} available fail", this.cluster), throwable); + } + return false; + } + + @Override + public String toString() { + return "Cluster{" + + "cluster='" + cluster + '\'' + + ", queue='" + queue + '\'' + + ", availableStrategy=" + availableStrategy + + '}'; + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/AvailableStrategy.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/AvailableStrategy.java new file mode 100644 index 0000000..6427ee8 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/AvailableStrategy.java @@ -0,0 +1,73 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy; + +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * 判断集群是否有可用资源的策略 + * + * @author ZhangJiacheng + * @date 2023-05-31 + */ +public interface AvailableStrategy { + static AvailableStrategy and(AvailableStrategy... strategies) { + return new AndAvailableStrategy(strategies); + } + + static AvailableStrategy or(AvailableStrategy... strategies) { + return new OrAvailableStrategy(strategies); + } + + static AvailableStrategy whether(AvailableStrategy condition, AvailableStrategy trueOption, AvailableStrategy falseOption) { + return new WhetherAvailableStrategy(condition, trueOption, falseOption); + } + + boolean available(); + + abstract class BaseAvailableStrategy implements AvailableStrategy { + protected final ImmutableList strategies; + + public BaseAvailableStrategy(AvailableStrategy... strategies) { + this.strategies = Lists.immutable.of(strategies); + } + } + + final class AndAvailableStrategy extends BaseAvailableStrategy { + public AndAvailableStrategy(AvailableStrategy... strategies) { + super(strategies); + } + + @Override + public boolean available() { + return strategies.allSatisfy(AvailableStrategy::available); + } + } + + final class OrAvailableStrategy extends BaseAvailableStrategy { + public OrAvailableStrategy(AvailableStrategy... strategies) { + super(strategies); + } + + @Override + public boolean available() { + return strategies.anySatisfy(AvailableStrategy::available); + } + } + + final class WhetherAvailableStrategy implements AvailableStrategy { + private final AvailableStrategy condition; + private final AvailableStrategy trueOption; + private final AvailableStrategy falseOption; + + public WhetherAvailableStrategy(AvailableStrategy condition, AvailableStrategy trueOption, AvailableStrategy falseOption) { + this.condition = condition; + this.trueOption = trueOption; + this.falseOption = falseOption; + } + + @Override + public boolean available() { + return condition.available() ? trueOption.available() : falseOption.available(); + } + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/DatetimeLimit.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/DatetimeLimit.java new file mode 100644 index 0000000..4f25411 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/DatetimeLimit.java @@ -0,0 +1,28 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy; + +import com.cronutils.model.CronType; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.model.time.ExecutionTime; +import com.cronutils.parser.CronParser; +import java.time.ZonedDateTime; + +/** + * 时间限制 + * + * @author ZhangJiacheng + * @date 2023-06-08 + */ +public class DatetimeLimit implements AvailableStrategy { + private static final CronParser PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)); + + private final ExecutionTime execution; + + public DatetimeLimit(String cron) { + this.execution = ExecutionTime.forCron(PARSER.parse(cron)); + } + + @Override + public boolean available() { + return execution.isMatch(ZonedDateTime.now()); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/QueueSizeLimit.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/QueueSizeLimit.java new file mode 100644 index 0000000..d87c89f --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/QueueSizeLimit.java @@ -0,0 +1,27 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy; + +import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; +import org.springframework.cloud.client.discovery.DiscoveryClient; + +/** + * 根据队列剩余任务数限制 + * + * @author ZhangJiacheng + * @date 2023-06-08 + */ +public class QueueSizeLimit implements AvailableStrategy { + private final DiscoveryClient discoveryClient; + private final String queue; + private final Integer limit; + + public QueueSizeLimit(DiscoveryClient discoveryClient, String queue, Integer limit) { + this.discoveryClient = discoveryClient; + this.queue = queue; + this.limit = limit; + } + + @Override + public boolean available() { + return QueueUtil.size(discoveryClient, queue) < limit; + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/YarnQueueUsedLimit.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/YarnQueueUsedLimit.java new file mode 100644 index 0000000..73ed210 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/YarnQueueUsedLimit.java @@ -0,0 +1,34 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy; + +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; +import com.lanyuanxiaoyao.service.forest.service.YarnService; + +/** + * Yarn 队列剩余资源限制 + * + * @author ZhangJiacheng + * @date 2023-06-08 + */ +public class YarnQueueUsedLimit implements AvailableStrategy { + private final YarnService yarnService; + private final String cluster; + private final String queue; + private final Double limit; + + public YarnQueueUsedLimit(YarnService yarnService, String cluster, String queue, Double limit) { + this.yarnService = yarnService; + this.cluster = cluster; + this.queue = queue; + this.limit = limit; + } + + private double queueUsed(String cluster, String queue) { + YarnQueue queueInfo = yarnService.queueDetail(cluster, queue); + return queueInfo.getAbsoluteUsedCapacity() * 1.0 / queueInfo.getAbsoluteMaxCapacity(); + } + + @Override + public boolean available() { + return queueUsed(cluster, queue) < limit; + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/strategy/ScheduleStrategy.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/strategy/ScheduleStrategy.java new file mode 100644 index 0000000..a8dbfb1 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/strategy/ScheduleStrategy.java @@ -0,0 +1,24 @@ +package com.lanyuanxiaoyao.service.scheduler.strategy; + +import org.quartz.JobDetail; +import org.quartz.Trigger; + +/** + * 调度策略 + * + * @author ZhangJiacheng + * @date 2023-06-06 + */ +public interface ScheduleStrategy { + Boolean enable(); + + Boolean show(); + + String name(); + + String comment(); + + JobDetail job(); + + Trigger trigger(); +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/strategy/ScheduleStrategyImpl.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/strategy/ScheduleStrategyImpl.java new file mode 100644 index 0000000..ecd3c12 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/strategy/ScheduleStrategyImpl.java @@ -0,0 +1,88 @@ +package com.lanyuanxiaoyao.service.scheduler.strategy; + +import cn.hutool.core.util.RandomUtil; +import java.time.Instant; +import java.util.Date; +import org.quartz.*; + +import static com.eshore.odcp.hudi.connector.Constants.SECOND; + +/** + * @author ZhangJiacheng + * @date 2023-06-06 + */ +public abstract class ScheduleStrategyImpl implements ScheduleStrategy { + protected final String name; + protected final String comment; + + protected ScheduleStrategyImpl(String name, String comment) { + this.name = name; + this.comment = comment; + } + + public static ScheduleStrategy simple(String name, String comment, Class job, String cron) { + return simple(true, name, comment, job, cron); + } + + public static ScheduleStrategy simple(Boolean show, String name, String comment, Class job, String cron) { + return new ScheduleStrategyImpl(name, comment) { + @Override + public Boolean show() { + return show; + } + + @Override + public JobDetail job() { + return generateJob(job); + } + + @Override + public Trigger trigger() { + return generateTrigger(cron); + } + }; + } + + @Override + public Boolean enable() { + return true; + } + + @Override + public String name() { + return name; + } + + @Override + public String comment() { + return comment; + } + + @Override + public String toString() { + return "BaseScheduleStrategy{" + + "name='" + name + '\'' + + ", comment='" + comment + '\'' + + '}'; + } + + protected JobDetail generateJob(Class job) { + return JobBuilder.newJob() + .withIdentity(name + "_job") + .withDescription(comment) + .ofType(job) + .build(); + } + + protected Trigger generateTrigger(String cron) { + return TriggerBuilder.newTrigger() + .withIdentity(name + "trigger") + .withDescription(cron) + .withSchedule( + CronScheduleBuilder + .cronSchedule(cron) + .withMisfireHandlingInstructionFireAndProceed()) + .startAt(Date.from(Instant.now().plusMillis(SECOND * RandomUtil.randomInt(30, 60)))) + .build(); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/strategy/ScheduleStrategyVO.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/strategy/ScheduleStrategyVO.java new file mode 100644 index 0000000..18b61fa --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/strategy/ScheduleStrategyVO.java @@ -0,0 +1,72 @@ +package com.lanyuanxiaoyao.service.scheduler.strategy; + +import cn.hutool.core.util.ObjectUtil; + +/** + * @author ZhangJiacheng + * @date 2023-06-26 + */ +public class ScheduleStrategyVO { + private final String name; + private final String comment; + private final String job; + private final String trigger; + private final Long lastFireTime; + private final Long nextFireTime; + + public ScheduleStrategyVO(String name, String comment, String job, String trigger, Long lastFireTime, Long nextFireTime) { + this.name = name; + this.comment = comment; + this.job = job; + this.trigger = trigger; + this.lastFireTime = lastFireTime; + this.nextFireTime = nextFireTime; + } + + public ScheduleStrategyVO(ScheduleStrategy strategy) { + this( + strategy.name(), + strategy.comment(), + strategy.job().getDescription(), + strategy.trigger().getDescription(), + ObjectUtil.isNull(strategy.trigger().getPreviousFireTime()) ? 0 : strategy.trigger().getPreviousFireTime().getTime(), + ObjectUtil.isNull(strategy.trigger().getNextFireTime()) ? 0 : strategy.trigger().getNextFireTime().getTime() + ); + } + + public String getName() { + return name; + } + + public String getComment() { + return comment; + } + + public String getJob() { + return job; + } + + public String getTrigger() { + return trigger; + } + + public Long getLastFireTime() { + return lastFireTime; + } + + public Long getNextFireTime() { + return nextFireTime; + } + + @Override + public String toString() { + return "ScheduleStrategyVO{" + + "name='" + name + '\'' + + ", comment='" + comment + '\'' + + ", job='" + job + '\'' + + ", trigger='" + trigger + '\'' + + ", lastFireTime=" + lastFireTime + + ", nextFireTime=" + nextFireTime + + '}'; + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java new file mode 100644 index 0000000..a48f58f --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java @@ -0,0 +1,149 @@ +package com.lanyuanxiaoyao.service.scheduler.utils; + +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.Constants; +import com.eshore.odcp.hudi.connector.entity.SyncState; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.eshore.odcp.hudi.connector.entity.compaction.ScheduleJob; +import com.eshore.odcp.hudi.connector.utils.TableMetaHelper; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.loki4j.slf4j.marker.LabelMarker; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; +import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; +import com.lanyuanxiaoyao.service.forest.service.HudiService; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import java.util.Comparator; +import java.util.function.Predicate; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.map.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.springframework.cloud.client.discovery.DiscoveryClient; + +/** + * 压缩调度工具 + * + * @author lanyuanxiaoyao + * @date 2023-05-12 + */ +public class ScheduleHelper { + private static final Logger logger = LoggerFactory.getLogger(ScheduleHelper.class); + + private static Marker makeMarker(Long flinkJobId, String alias) { + return LabelMarker.of(() -> Maps.mutable.of(Constants.LOG_FLINK_JOB_ID, flinkJobId.toString(), Constants.LOG_ALIAS, alias)); + } + + public static void schedule( + DiscoveryClient discoveryClient, + InfoService infoService, + HudiService hudiService, + ObjectMapper mapper, + Predicate predicate, + String comment, + ImmutableMap metadata + ) { + String batchId = IdUtil.nanoId(10); + infoService.tableMetaList() + // 只调度 MOR 表 + .select(meta -> StrUtil.equals(Constants.MOR, meta.getHudi().getTargetTableType())) + .asParallel(ExecutorProvider.EXECUTORS, 1) + .select(predicate::test) + // 没有 Hudi 表的过滤掉 + .select(meta -> { + try { + return hudiService.existsHudiTable(meta.getJob().getId(), meta.getAlias()); + } catch (Throwable throwable) { + logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get hudi status failure", throwable); + } + return false; + }) + // 没有压缩计划的过滤掉 + .select(meta -> { + try { + return hudiService.existsCompactionPlan(meta.getJob().getId(), meta.getAlias()); + } catch (Throwable throwable) { + logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get compaction status failure", throwable); + } + return false; + }) + // 拒绝不压缩标志的任务 + .reject(meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) + .collect(meta -> { + long compactionDuration = 0L; + try { + // 计算压缩耗时 + SyncState syncState = infoService.syncStateDetail(meta.getJob().getId(), meta.getAlias()); + if (ObjectUtil.isNotNull(syncState) + && ObjectUtil.isNotNull(syncState.getCompactionFinishTime()) + && ObjectUtil.isNotNull(syncState.getCompactionStartTime())) { + compactionDuration = syncState.getCompactionFinishTime() - syncState.getCompactionStartTime(); + } + } catch (Throwable e) { + logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get sync state failure for {} {}", meta.getJob().getId(), meta.getAlias()); + } + return new TableMetaWrapper(meta, compactionDuration); + }) + .toSortedList( + Comparator + // 比较 Bucket 数,数量大的在前面 + .comparing(TableMetaWrapper::getBucketIndexNumber, Comparator.reverseOrder()) + // 比较压缩耗时,压缩耗时长的在前面 + .thenComparing(TableMetaWrapper::getCompactionDuration, Comparator.reverseOrder())) + .collect(meta -> new QueueItem<>( + StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias()), + metadata.toMap(), + meta.getPriority(), + ScheduleJob.builder() + .id(IdUtil.nanoId(10)) + .flinkJobId(meta.getFlinkJobId()) + .alias(meta.getAlias()) + .batch(batchId) + .status(Constants.COMPACTION_STATUS_SCHEDULE) + .comment(comment) + .build() + )) + .forEach(item -> { + // 将任务放入预处理队列等待处理 + QueueUtil.add(discoveryClient, mapper, Constants.COMPACTION_QUEUE_PRE, item); + if (ObjectUtil.isNotNull(item.getData())) { + logger.info(makeMarker(item.getData().getFlinkJobId(), item.getData().getAlias()), "Schedule {}", item); + } else { + logger.warn("Item data is null, {}", item); + } + }); + } + + private static final class TableMetaWrapper { + private final TableMeta tableMeta; + private final Long compactionDuration; + + private TableMetaWrapper(TableMeta tableMeta, Long compactionDuration) { + this.tableMeta = tableMeta; + this.compactionDuration = compactionDuration; + } + + public Long getFlinkJobId() { + return tableMeta.getJob().getId(); + } + + public String getAlias() { + return tableMeta.getAlias(); + } + + public Integer getBucketIndexNumber() { + return tableMeta.getHudi().getBucketIndexNumber(); + } + + public Integer getPriority() { + return tableMeta.getPriority(); + } + + public Long getCompactionDuration() { + return compactionDuration; + } + } +} diff --git a/service-scheduler/src/main/resources/application.yml b/service-scheduler/src/main/resources/application.yml new file mode 100644 index 0000000..2b9308c --- /dev/null +++ b/service-scheduler/src/main/resources/application.yml @@ -0,0 +1,5 @@ +spring: + application: + name: service-scheduler + profiles: + include: random-port,common,discovery,metrics,forest \ No newline at end of file diff --git a/service-scheduler/src/main/resources/logback-spring.xml b/service-scheduler/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..17fb54a --- /dev/null +++ b/service-scheduler/src/main/resources/logback-spring.xml @@ -0,0 +1,52 @@ + + + + + + + + + + + true + + ${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push} + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx + + true + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %clr(%5p) %clr([${HOSTNAME}]){yellow} %clr([%t]){magenta} %clr(%logger{40}){cyan} #@# %m%n%wEx + + + + + ${LOGGING_PARENT:-.}/${APP_NAME:-run}.log + + ${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz + 7 + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx + + + + + + + + + + + + \ No newline at end of file