feat(scheduler): 迁移service-scheduler项目

This commit is contained in:
2024-02-28 15:20:52 +08:00
parent 14409ae798
commit f258bfe5b0
29 changed files with 1658 additions and 0 deletions

4
bin/build-scheduler.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/bin/bash
mvn -pl service-dependencies,service-configuration clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml
mvn -pl service-scheduler clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml
ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-scheduler/target/service-scheduler-1.0.0-SNAPSHOT.jar

View File

@@ -29,6 +29,7 @@
<module>service-exporter</module>
<module>service-check</module>
<module>service-api</module>
<module>service-scheduler</module>
</modules>
<properties>

View File

@@ -0,0 +1,197 @@
package com.lanyuanxiaoyao.service.configuration.utils;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import com.eshore.odcp.hudi.connector.Constants;
import com.eshore.odcp.hudi.connector.entity.compaction.ScheduleJob;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.time.Duration;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
/**
* 队列操作
*
* @author lanyuanxiaoyao
* @date 2023-05-08
*/
public class QueueUtil {
private static final Logger logger = LoggerFactory.getLogger(QueueUtil.class);
private static final RetryPolicy<Object> RETRY = RetryPolicy.builder()
.handle(Throwable.class)
.withDelay(Duration.ofSeconds(5))
.withMaxAttempts(12)
.build();
private static String getQueueUrl(DiscoveryClient client) {
List<ServiceInstance> instances = client.getInstances("service-queue");
if (ObjectUtil.isEmpty(instances)) {
logger.info("Current instances: {}", String.join(",", client.getServices()));
throw new RuntimeException("Queue instance not found");
}
return instances.get(0).getUri().toString();
}
public static ImmutableList<String> names(DiscoveryClient client, ObjectMapper mapper) {
return Failsafe.with(RETRY)
.get(() -> {
try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(getQueueUrl(client), "/names"))
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.execute()) {
if (response.isOk()) {
return mapper.readValue(response.body(), new TypeReference<ImmutableList<String>>() {
});
}
return Lists.immutable.empty();
}
});
}
public static Boolean add(DiscoveryClient client, ObjectMapper mapper, String name, QueueItem<ScheduleJob> job) {
try {
if (ObjectUtil.isNull(job)) {
logger.warn("Job cannot be null");
return false;
}
String body = mapper.writeValueAsString(job);
if (StrUtil.isBlank(body)) {
logger.error("Body cannot be blank");
return false;
}
return Failsafe.with(RETRY)
.get(() -> {
try (HttpResponse response = HttpUtil.createPost(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/add?name={}", name)))
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.body(body)
.execute()) {
if (response.isOk()) {
return Boolean.valueOf(response.body());
}
return false;
}
});
} catch (Throwable e) {
logger.error("Add job to queue error", e);
}
return false;
}
public static QueueItem<ScheduleJob> poll(DiscoveryClient client, ObjectMapper mapper, String name) {
try {
return Failsafe.with(RETRY)
.get(() -> {
try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/poll?name={}", name)))
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.execute()) {
if (response.isOk()) {
String body = response.body();
if (StrUtil.isNotBlank(body)) {
try {
return mapper.readValue(body, new TypeReference<QueueItem<ScheduleJob>>() {
});
} catch (Throwable error) {
logger.error("Schedule job parse error", error);
}
}
}
return null;
}
});
} catch (Throwable e) {
logger.error("Poll job from queue error", e);
}
return null;
}
public static Boolean isEmpty(DiscoveryClient client, String name) {
try {
try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/is_empty?name={}", name)))
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.execute()) {
if (response.isOk()) {
String body = response.body();
if (StrUtil.isNotBlank(body)) {
return Boolean.valueOf(body);
}
}
return true;
}
} catch (Throwable e) {
logger.error("Query empty failure", e);
}
return true;
}
public static Integer size(DiscoveryClient client, String name) {
try {
try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/size?name={}", name)))
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.execute()) {
if (response.isOk()) {
String body = response.body();
if (StrUtil.isNotBlank(body)) {
return Integer.valueOf(body);
}
}
return -1;
}
} catch (Throwable e) {
logger.error("Query empty failure", e);
}
return -1;
}
public static Boolean remove(DiscoveryClient client, String name, String id) {
try {
try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/remove_all_id?name={}&id={}", name, id)))
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.execute()) {
if (response.isOk()) {
return Boolean.valueOf(response.body());
}
return true;
}
} catch (Throwable e) {
logger.error(StrUtil.format("Remove {} failure", id), e);
}
return false;
}
public static Boolean remove(DiscoveryClient client, ObjectMapper mapper, String name, ImmutableList<String> ids) {
try {
String body = mapper.writeValueAsString(ids);
if (StrUtil.isBlank(body)) {
logger.error("Body cannot be blank");
return false;
}
try (HttpResponse response = HttpUtil.createPost(URLUtil.completeUrl(getQueueUrl(client), StrUtil.format("/queue/remove_all_id?name={}", name)))
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.body(body)
.execute()) {
if (response.isOk()) {
return Boolean.valueOf(response.body());
}
return true;
}
} catch (Throwable e) {
logger.error(StrUtil.format("Remove {} failure", ids), e);
}
return false;
}
public interface Function<F, T> {
T apply(F f) throws Throwable;
}
}

40
service-scheduler/pom.xml Normal file
View File

@@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>hudi-service</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>service-scheduler</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>service-forest</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
<version>9.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,72 @@
package com.lanyuanxiaoyao.service.scheduler;
import cn.hutool.core.thread.ThreadUtil;
import com.lanyuanxiaoyao.service.scheduler.strategy.ScheduleStrategy;
import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties;
import org.eclipse.collections.api.list.ImmutableList;
import org.quartz.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScans;
/**
* 启动类
*
* @author ZhangJiacheng
* @date 2023-05-11
*/
@EnableDiscoveryClient
@SpringBootApplication(exclude = {GsonAutoConfiguration.class})
@ComponentScans({
@ComponentScan("com.lanyuanxiaoyao.service"),
})
@EnableConfigurationProperties
@EnableEncryptableProperties
public class SchedulerApplication implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(SchedulerApplication.class);
private final DiscoveryClient client;
private final Scheduler quartzScheduler;
private final ImmutableList<ScheduleStrategy> strategies;
@Value("spring.application.name")
private String application;
public SchedulerApplication(DiscoveryClient client, Scheduler quartzScheduler, ImmutableList<ScheduleStrategy> strategies) {
this.client = client;
this.quartzScheduler = quartzScheduler;
this.strategies = strategies;
}
public static void main(String[] args) {
SpringApplication.run(SchedulerApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
// 队列不支持分布式启动,只能启动一个
if (client.getServices().contains(application)) {
logger.error("Found " + application + " exists, exit...");
logger.warn("Wait for 15s and exit");
ThreadUtil.safeSleep(15000);
System.exit(1);
}
for (ScheduleStrategy strategy : strategies) {
if (strategy.enable()) {
logger.info("Schedule {} ({})", strategy.name(), strategy.comment());
quartzScheduler.scheduleJob(strategy.job(), strategy.trigger());
} else {
logger.warn("Strategy {} ({}) is disabled", strategy.name(), strategy.comment());
}
}
}
}

View File

@@ -0,0 +1,40 @@
package com.lanyuanxiaoyao.service.scheduler.configuration;
import com.lanyuanxiaoyao.service.scheduler.quartz.compaction.*;
import com.lanyuanxiaoyao.service.scheduler.strategy.ScheduleStrategy;
import com.lanyuanxiaoyao.service.scheduler.strategy.ScheduleStrategyImpl;
import com.lanyuanxiaoyao.service.scheduler.strategy.ScheduleStrategyVO;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author ZhangJiacheng
* @date 2023-06-06
*/
@Configuration
public class ScheduleStrategyProvider {
@Bean
public ImmutableList<ScheduleStrategy> strategies() {
return Lists.immutable.of(
ScheduleStrategyImpl.simple(false, "distribute_schedule", "定时分布式调度", DistributeScheduleJob.class, "0/2 * * * * ?"),
// 普通调度
ScheduleStrategyImpl.simple("daily_schedule", "普通全表调度", DailyScheduleJob.class, "0 50 1,4,7,10,13,16,19 * * ?"),
// 重点表调度
ScheduleStrategyImpl.simple("focus_evening_schedule", "晚间重点表调度", FocusScheduleJob.class, "0 50 20,21,22 * * ?"),
// ODS重点表调度
ScheduleStrategyImpl.simple("ods_focus_schedule", "ODS 重点表调度", OdsFocusScheduleJob.class, "0 30 23 * * ?"),
// 忙时调度
ScheduleStrategyImpl.simple("focus_schedule", "重点表跨天调度", FocusUnVersionUpdateScheduleJob.class, "0 0,10,20,40 0,1 * * ?"),
ScheduleStrategyImpl.simple(false, "remove_scheduled", "跨天调度时及时删除已完成跨天的任务", RemoveScheduledJob.class, "0 * 0,1 * * ?")
);
}
@Bean
public ImmutableList<ScheduleStrategyVO> strategyVOS(ImmutableList<ScheduleStrategy> strategies) {
return strategies
.select(ScheduleStrategy::enable)
.collect(ScheduleStrategyVO::new);
}
}

View File

@@ -0,0 +1,155 @@
package com.lanyuanxiaoyao.service.scheduler.controller;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil;
import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import com.eshore.odcp.hudi.connector.Constants;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.launcher.LaunchersService;
import com.lanyuanxiaoyao.service.scheduler.quartz.compaction.DailyScheduleJob;
import com.lanyuanxiaoyao.service.scheduler.quartz.compaction.FocusScheduleJob;
import com.lanyuanxiaoyao.service.scheduler.quartz.compaction.FocusUnVersionUpdateScheduleJob;
import com.lanyuanxiaoyao.service.scheduler.strategy.ScheduleStrategyVO;
import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.api.map.MutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 调度器对外接口
*
* @author ZhangJiacheng
* @date 2023-05-12
*/
@RestController
@RequestMapping("schedule")
public class ScheduleController {
private static final Logger logger = LoggerFactory.getLogger(ScheduleController.class);
private final DiscoveryClient discoveryClient;
private final InfoService infoService;
private final HudiService hudiService;
private final ImmutableList<ScheduleStrategyVO> strategyVOS;
private final LaunchersService launchersService;
private final ObjectMapper mapper;
private final LoadingCache<String, ImmutableList<ZonedDateTime>> scheduleJobTimeCache = Caffeine.newBuilder()
.expireAfterWrite(Duration.of(10, ChronoUnit.MINUTES))
.build(cronExpression -> {
MutableList<ZonedDateTime> times = Lists.mutable.empty();
CronParser cronParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ));
ZonedDateTime now = ZonedDateTime.now().withHour(0).withMinute(0).withSecond(0).withNano(0);
ZonedDateTime night = now.plusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
Cron cron = cronParser.parse(cronExpression);
ExecutionTime executionTime = ExecutionTime.forCron(cron);
Optional<ZonedDateTime> zonedDateTime = executionTime.nextExecution(now);
while (zonedDateTime.isPresent() && zonedDateTime.get().isBefore(night)) {
times.add(zonedDateTime.get());
zonedDateTime = executionTime.nextExecution(zonedDateTime.get());
}
return times.toImmutable();
});
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public ScheduleController(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ImmutableList<ScheduleStrategyVO> strategyVOS, LaunchersService launchersService, Jackson2ObjectMapperBuilder builder) {
this.discoveryClient = discoveryClient;
this.infoService = infoService;
this.hudiService = hudiService;
this.strategyVOS = strategyVOS;
this.launchersService = launchersService;
this.mapper = builder.build();
}
@GetMapping("schedule_jobs")
public ImmutableList<ScheduleStrategyVO> scheduleJobs() {
return strategyVOS;
}
@GetMapping("schedule_times")
public ImmutableList<Long> scheduleJobTimes() {
/*return strategies
.select(ScheduleStrategy::enable)
.select(ScheduleStrategy::show)
.select(strategy -> (strategy.trigger()) instanceof CronTrigger)
.flatCollect(strategy -> scheduleJobTimeCache.get(((CronTrigger) strategy.trigger()).getCronExpression()))
.toSortedList(ZonedDateTime::compareTo)
.collect(time -> time.toInstant().toEpochMilli())
.toImmutable();*/
return Lists.immutable.empty();
}
@GetMapping("all")
public void scheduleAllTable() {
DailyScheduleJob.schedule(discoveryClient, infoService, hudiService, mapper, "Daily schedule manually");
}
@GetMapping("all_focus")
public void scheduleAllFocus() {
FocusScheduleJob.schedule(discoveryClient, infoService, hudiService, mapper, "Focus schedule manually");
}
@GetMapping("all_un_scheduled")
public void scheduleAllUnScheduledTable() {
FocusUnVersionUpdateScheduleJob.schedule(discoveryClient, infoService, hudiService, mapper, "Un-updated version schedule manually");
}
@GetMapping("table")
public void scheduleTable(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam(value = "force", required = false) String forceCluster,
@RequestParam(value = "recommend", required = false) String recommendCluster
) {
MutableMap<String, String> metadata = Maps.mutable.empty();
if (StrUtil.isNotBlank(forceCluster)) {
metadata.put(Constants.SCHEDULE_FORCE, forceCluster);
}
if (StrUtil.isNotBlank(recommendCluster)) {
metadata.put(Constants.SCHEDULE_RECOMMEND, recommendCluster);
}
ScheduleHelper.schedule(
discoveryClient,
infoService,
hudiService,
mapper,
meta -> StrUtil.equals(meta.getAlias(), alias) && NumberUtil.equals(meta.getJob().getId(), flinkJobId),
"Schedule manually",
metadata.toImmutable()
);
}
@GetMapping("stop_all")
public void stopTheWorld(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam(value = "disable_meta", defaultValue = "true") Boolean disableMeta
) {
QueueUtil.remove(discoveryClient, Constants.COMPACTION_QUEUE_PRE, StrUtil.format("{}-{}", flinkJobId, alias));
launchersService.compactionStop(flinkJobId, alias, disableMeta);
// 再删除一次,避免时间差导致任务重新放回预调度队列
QueueUtil.remove(discoveryClient, Constants.COMPACTION_QUEUE_PRE, StrUtil.format("{}-{}", flinkJobId, alias));
}
}

View File

@@ -0,0 +1,15 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.compaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean;
/**
* 一些通用方法
*
* @author ZhangJiacheng
* @date 2023-05-11
*/
public abstract class BaseScheduleJob extends QuartzJobBean {
private static final Logger logger = LoggerFactory.getLogger(BaseScheduleJob.class);
}

View File

@@ -0,0 +1,58 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.compaction;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.eclipse.collections.api.factory.Maps;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
/**
* 日常普通调度
*
* @author ZhangJiacheng
* @date 2023-05-11
*/
@DisallowConcurrentExecution
public class DailyScheduleJob extends BaseScheduleJob {
private static final Logger logger = LoggerFactory.getLogger(DailyScheduleJob.class);
private final DiscoveryClient discoveryClient;
private final InfoService infoService;
private final HudiService hudiService;
private final ObjectMapper mapper;
public DailyScheduleJob(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, Jackson2ObjectMapperBuilder builder) {
this.discoveryClient = discoveryClient;
this.infoService = infoService;
this.hudiService = hudiService;
this.mapper = builder.build();
}
public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) {
logger.info("Daily schedule");
ScheduleHelper.schedule(
discoveryClient,
infoService,
hudiService,
mapper,
meta -> true,
comment,
Maps.immutable.empty()
);
}
@Override
protected void executeInternal(JobExecutionContext context) {
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
schedule(discoveryClient, infoService, hudiService, mapper, "Daily schedule for " + now);
}
}

View File

@@ -0,0 +1,127 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.compaction;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.Constants;
import com.eshore.odcp.hudi.connector.entity.compaction.ScheduleJob;
import com.eshore.odcp.hudi.connector.utils.LogHelper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnClusters;
import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster.Cluster;
import java.util.Optional;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.api.map.ImmutableMap;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.ApplicationContext;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
/**
* 分发压缩任务
*
* @author lanyuanxiaoyao
* @date 2023-05-25
*/
@DisallowConcurrentExecution
public class DistributeScheduleJob extends BaseScheduleJob {
private static final Logger logger = LoggerFactory.getLogger(DistributeScheduleJob.class);
private static final String DISTRIBUTE_SCHEDULE_RETRY = "distribute_schedule_retry";
private final DiscoveryClient discoveryClient;
private final ObjectMapper mapper;
private final ImmutableList<Cluster> clusters;
private final String defaultCluster;
public DistributeScheduleJob(DiscoveryClient discoveryClient, YarnClusters yarnClusters, ApplicationContext applicationContext, Jackson2ObjectMapperBuilder builder) {
this.discoveryClient = discoveryClient;
this.mapper = builder.build();
MutableList<Cluster> list = Lists.mutable.empty();
for (String cluster : yarnClusters.getActiveClusters()) {
list.add(applicationContext.getBean(StrUtil.format("{}Cluster", StrUtil.toCamelCase(cluster)), Cluster.class));
}
this.clusters = list.toImmutable();
this.defaultCluster = yarnClusters.getDefaultSyncCluster();
}
private String getAvailableQueue(QueueItem<ScheduleJob> item) {
ImmutableMap<String, String> metadata = Maps.immutable.ofAll(item.getMetadata());
if (metadata.containsKey(Constants.SCHEDULE_FORCE)) {
String forceCluster = metadata.get(Constants.SCHEDULE_FORCE);
Optional<Cluster> cluster = clusters.select(s -> StrUtil.equals(forceCluster, s.cluster())).getFirstOptional();
if (cluster.isPresent()) {
return cluster.get().queue();
} else {
logger.warn(StrUtil.format("{} cluster not found", forceCluster));
}
}
if (metadata.containsKey(Constants.SCHEDULE_RECOMMEND)) {
String recommendCluster = metadata.get(Constants.SCHEDULE_RECOMMEND);
Optional<Cluster> cluster = clusters.select(s -> StrUtil.equals(recommendCluster, s.cluster())).getFirstOptional();
if (cluster.isPresent() && cluster.get().available()) {
return cluster.get().queue();
} else {
logger.warn(StrUtil.format("{} cluster not found or busy"));
}
}
for (Cluster cluster : clusters) {
if (cluster.available()) {
return cluster.queue();
}
}
return null;
}
@Override
@SuppressWarnings("DataFlowIssue")
protected void executeInternal(JobExecutionContext context) {
QueueItem<ScheduleJob> item = QueueUtil.poll(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE);
if (ObjectUtil.isNotNull(item)) {
LogHelper.setMdc(Constants.LOG_JOB_ID_LABEL, item.getTraceId());
if (ObjectUtil.isNotNull(item.getData())) {
ScheduleJob data = item.getData();
LogHelper.setMdcFlinkJobAndAlias(data.getFlinkJobId(), data.getAlias());
}
try {
String availableQueue = getAvailableQueue(item);
// 如果没有获取到可用的调度集群
if (ObjectUtil.isNull(availableQueue)) {
// 获取任务里的调度重试次数
long retryTime = item.getMetadata().containsKey(DISTRIBUTE_SCHEDULE_RETRY) ? Long.parseLong(item.getMetadata(DISTRIBUTE_SCHEDULE_RETRY)) : 0L;
// 如果重试次数小于6
if (retryTime < 6) {
// 重试次数加1
item.getMetadata().put(DISTRIBUTE_SCHEDULE_RETRY, String.valueOf(retryTime + 1));
// 暂停15秒
ThreadUtil.safeSleep(15 * Constants.SECOND);
// 重新放回pre队列
QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item);
}
// 如果重试次数大于6
else {
logger.warn("Cannot found available cluster six time for item " + item.getId());
// 移除重试字段因为已经交给保底队列如果保底队列再次把任务放回pre队列那就当作全新的任务重试计数
item.getMetadata().remove(DISTRIBUTE_SCHEDULE_RETRY);
QueueUtil.add(discoveryClient, this.mapper, this.defaultCluster, item);
}
} else {
QueueUtil.add(discoveryClient, this.mapper, availableQueue, item);
}
} catch (Throwable e) {
logger.error("Distribute error", e);
QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item);
} finally {
LogHelper.removeMdc(Constants.LOG_JOB_ID_LABEL, Constants.LOG_FLINK_JOB_ID_LABEL, Constants.LOG_ALIAS_LABEL);
}
}
}
}

View File

@@ -0,0 +1,58 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.compaction;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.eclipse.collections.api.factory.Maps;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
/**
* 日常普通调度
*
* @author ZhangJiacheng
* @date 2023-05-11
*/
@DisallowConcurrentExecution
public class FocusScheduleJob extends BaseScheduleJob {
private static final Logger logger = LoggerFactory.getLogger(FocusScheduleJob.class);
private final DiscoveryClient discoveryClient;
private final InfoService infoService;
private final HudiService hudiService;
private final ObjectMapper mapper;
public FocusScheduleJob(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, Jackson2ObjectMapperBuilder builder) {
this.discoveryClient = discoveryClient;
this.infoService = infoService;
this.hudiService = hudiService;
this.mapper = builder.build();
}
public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) {
logger.info("Focus schedule");
ScheduleHelper.schedule(
discoveryClient,
infoService,
hudiService,
mapper,
meta -> meta.getPriority() >= 10000,
comment,
Maps.immutable.empty()
);
}
@Override
protected void executeInternal(JobExecutionContext context) {
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
schedule(discoveryClient, infoService, hudiService, mapper, "Focus schedule for " + now);
}
}

View File

@@ -0,0 +1,62 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.compaction;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
/**
* 日常普通调度
*
* @author ZhangJiacheng
* @date 2023-05-11
*/
@DisallowConcurrentExecution
public class FocusUnVersionUpdateScheduleJob extends BaseScheduleJob {
private static final Logger logger = LoggerFactory.getLogger(FocusUnVersionUpdateScheduleJob.class);
private final DiscoveryClient discoveryClient;
private final InfoService infoService;
private final HudiService hudiService;
private final ObjectMapper mapper;
public FocusUnVersionUpdateScheduleJob(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, Jackson2ObjectMapperBuilder builder) {
this.discoveryClient = discoveryClient;
this.infoService = infoService;
this.hudiService = hudiService;
this.mapper = builder.build();
}
public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) {
logger.info("Focus un update version schedule");
ImmutableList<String> unUpdateVersionTableIds = infoService.nonUpdatedVersionTables();
ScheduleHelper.schedule(
discoveryClient,
infoService,
hudiService,
mapper,
meta -> meta.getPriority() >= 10000
&& unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getJob().getId(), meta.getAlias())),
comment,
Maps.immutable.empty()
);
}
@Override
protected void executeInternal(JobExecutionContext context) {
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
schedule(discoveryClient, infoService, hudiService, mapper, "Focus schedule for " + now);
}
}

View File

@@ -0,0 +1,60 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.compaction;
import com.eshore.odcp.hudi.connector.Constants;
import com.eshore.odcp.hudi.connector.utils.TableMetaHelper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.eclipse.collections.api.factory.Maps;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
/**
* 日常普通调度
*
* @author ZhangJiacheng
* @date 2023-05-11
*/
@DisallowConcurrentExecution
public class OdsFocusScheduleJob extends BaseScheduleJob {
private static final Logger logger = LoggerFactory.getLogger(OdsFocusScheduleJob.class);
private final DiscoveryClient discoveryClient;
private final InfoService infoService;
private final HudiService hudiService;
private final ObjectMapper mapper;
public OdsFocusScheduleJob(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, Jackson2ObjectMapperBuilder builder) {
this.discoveryClient = discoveryClient;
this.infoService = infoService;
this.hudiService = hudiService;
this.mapper = builder.build();
}
public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) {
logger.info("Ods focus schedule");
ScheduleHelper.schedule(
discoveryClient,
infoService,
hudiService,
mapper,
meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_ODS_FOCUS),
comment,
Maps.immutable.empty()
);
}
@Override
protected void executeInternal(JobExecutionContext context) {
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
schedule(discoveryClient, infoService, hudiService, mapper, "Ods focus schedule for " + now);
}
}

View File

@@ -0,0 +1,39 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.compaction;
import com.eshore.odcp.hudi.connector.Constants;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
/**
* 移除已完成表的跨天调度
*
* @author ZhangJiacheng
* @date 2023-06-06
*/
@DisallowConcurrentExecution
public class RemoveScheduledJob extends BaseScheduleJob {
private static final Logger logger = LoggerFactory.getLogger(RemoveScheduledJob.class);
private final DiscoveryClient discoveryClient;
private final InfoService infoService;
private final ObjectMapper mapper;
public RemoveScheduledJob(DiscoveryClient discoveryClient, InfoService infoService, Jackson2ObjectMapperBuilder builder) {
this.discoveryClient = discoveryClient;
this.infoService = infoService;
this.mapper = builder.build();
}
@Override
protected void executeInternal(JobExecutionContext context) {
QueueUtil.remove(discoveryClient, mapper, Constants.COMPACTION_QUEUE_PRE, infoService.updatedVersionTables());
}
}

View File

@@ -0,0 +1,30 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster;
import com.eshore.odcp.hudi.connector.Constants;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.stereotype.Component;
/**
* A4
*
* @author lanyuanxiaoyao
* @date 2023-06-08
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Component
public class A4Cluster extends Cluster {
public A4Cluster(DiscoveryClient client, YarnService yarnService) {
super(
Constants.CLUSTER_A4,
Constants.COMPACTION_QUEUE_A4,
AvailableStrategy.and(
new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_A4, 10),
new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_A4, "ten_iap.datalake", 0.8)
)
);
}
}

View File

@@ -0,0 +1,30 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster;
import com.eshore.odcp.hudi.connector.Constants;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.stereotype.Component;
/**
* A4
*
* @author lanyuanxiaoyao
* @date 2023-06-08
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Component
public class B12Cluster extends Cluster {
public B12Cluster(DiscoveryClient client, YarnService yarnService) {
super(
Constants.CLUSTER_B12,
Constants.COMPACTION_QUEUE_B12,
AvailableStrategy.and(
new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_B12, 20),
new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_B12, "default", 0.8)
)
);
}
}

View File

@@ -0,0 +1,30 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster;
import com.eshore.odcp.hudi.connector.Constants;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.stereotype.Component;
/**
* B1
*
* @author lanyuanxiaoyao
* @date 2023-06-08
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Component
public class B1Cluster extends Cluster {
public B1Cluster(DiscoveryClient client, YarnService yarnService) {
super(
Constants.CLUSTER_B1,
Constants.COMPACTION_QUEUE_B1,
AvailableStrategy.and(
new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_B1, 20),
new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_B1, "datalake", 1.0)
)
);
}
}

View File

@@ -0,0 +1,36 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster;
import com.eshore.odcp.hudi.connector.Constants;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.stereotype.Component;
/**
* B5
*
* @author lanyuanxiaoyao
* @date 2023-06-08
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Component
public class B5Cluster extends Cluster {
public B5Cluster(DiscoveryClient client, YarnService yarnService) {
super(
Constants.CLUSTER_B5,
Constants.COMPACTION_QUEUE_B5,
AvailableStrategy.and(
new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_B5, 10),
new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_B5, "ten_iap.datalake", 0.9)
/*AvailableStrategy.whether(
new DatetimeLimit("* * 0-1 * * ?"),
new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_B5, "ten_iap.datalake", 0.9),
new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_B5, "ten_iap.datalake", 0.5)
)*/
)
);
}
}

View File

@@ -0,0 +1,52 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 集群
*
* @author ZhangJiacheng
* @date 2023-06-08
*/
public class Cluster {
private static final Logger logger = LoggerFactory.getLogger(Cluster.class);
private final String cluster;
private final String queue;
private final AvailableStrategy availableStrategy;
public Cluster(String cluster, String queue, AvailableStrategy availableStrategy) {
this.cluster = cluster;
this.queue = queue;
this.availableStrategy = availableStrategy;
}
public String cluster() {
return cluster;
}
public String queue() {
return queue;
}
public boolean available() {
try {
return availableStrategy.available();
} catch (Throwable throwable) {
logger.error(StrUtil.format("Check cluster {} available fail", this.cluster), throwable);
}
return false;
}
@Override
public String toString() {
return "Cluster{" +
"cluster='" + cluster + '\'' +
", queue='" + queue + '\'' +
", availableStrategy=" + availableStrategy +
'}';
}
}

View File

@@ -0,0 +1,73 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
/**
* 判断集群是否有可用资源的策略
*
* @author ZhangJiacheng
* @date 2023-05-31
*/
public interface AvailableStrategy {
static AvailableStrategy and(AvailableStrategy... strategies) {
return new AndAvailableStrategy(strategies);
}
static AvailableStrategy or(AvailableStrategy... strategies) {
return new OrAvailableStrategy(strategies);
}
static AvailableStrategy whether(AvailableStrategy condition, AvailableStrategy trueOption, AvailableStrategy falseOption) {
return new WhetherAvailableStrategy(condition, trueOption, falseOption);
}
boolean available();
abstract class BaseAvailableStrategy implements AvailableStrategy {
protected final ImmutableList<AvailableStrategy> strategies;
public BaseAvailableStrategy(AvailableStrategy... strategies) {
this.strategies = Lists.immutable.of(strategies);
}
}
final class AndAvailableStrategy extends BaseAvailableStrategy {
public AndAvailableStrategy(AvailableStrategy... strategies) {
super(strategies);
}
@Override
public boolean available() {
return strategies.allSatisfy(AvailableStrategy::available);
}
}
final class OrAvailableStrategy extends BaseAvailableStrategy {
public OrAvailableStrategy(AvailableStrategy... strategies) {
super(strategies);
}
@Override
public boolean available() {
return strategies.anySatisfy(AvailableStrategy::available);
}
}
final class WhetherAvailableStrategy implements AvailableStrategy {
private final AvailableStrategy condition;
private final AvailableStrategy trueOption;
private final AvailableStrategy falseOption;
public WhetherAvailableStrategy(AvailableStrategy condition, AvailableStrategy trueOption, AvailableStrategy falseOption) {
this.condition = condition;
this.trueOption = trueOption;
this.falseOption = falseOption;
}
@Override
public boolean available() {
return condition.available() ? trueOption.available() : falseOption.available();
}
}
}

View File

@@ -0,0 +1,28 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import java.time.ZonedDateTime;
/**
* 时间限制
*
* @author ZhangJiacheng
* @date 2023-06-08
*/
public class DatetimeLimit implements AvailableStrategy {
private static final CronParser PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ));
private final ExecutionTime execution;
public DatetimeLimit(String cron) {
this.execution = ExecutionTime.forCron(PARSER.parse(cron));
}
@Override
public boolean available() {
return execution.isMatch(ZonedDateTime.now());
}
}

View File

@@ -0,0 +1,27 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy;
import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
import org.springframework.cloud.client.discovery.DiscoveryClient;
/**
* 根据队列剩余任务数限制
*
* @author ZhangJiacheng
* @date 2023-06-08
*/
public class QueueSizeLimit implements AvailableStrategy {
private final DiscoveryClient discoveryClient;
private final String queue;
private final Integer limit;
public QueueSizeLimit(DiscoveryClient discoveryClient, String queue, Integer limit) {
this.discoveryClient = discoveryClient;
this.queue = queue;
this.limit = limit;
}
@Override
public boolean available() {
return QueueUtil.size(discoveryClient, queue) < limit;
}
}

View File

@@ -0,0 +1,34 @@
package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
/**
* Yarn 队列剩余资源限制
*
* @author ZhangJiacheng
* @date 2023-06-08
*/
public class YarnQueueUsedLimit implements AvailableStrategy {
private final YarnService yarnService;
private final String cluster;
private final String queue;
private final Double limit;
public YarnQueueUsedLimit(YarnService yarnService, String cluster, String queue, Double limit) {
this.yarnService = yarnService;
this.cluster = cluster;
this.queue = queue;
this.limit = limit;
}
private double queueUsed(String cluster, String queue) {
YarnQueue queueInfo = yarnService.queueDetail(cluster, queue);
return queueInfo.getAbsoluteUsedCapacity() * 1.0 / queueInfo.getAbsoluteMaxCapacity();
}
@Override
public boolean available() {
return queueUsed(cluster, queue) < limit;
}
}

View File

@@ -0,0 +1,24 @@
package com.lanyuanxiaoyao.service.scheduler.strategy;
import org.quartz.JobDetail;
import org.quartz.Trigger;
/**
* 调度策略
*
* @author ZhangJiacheng
* @date 2023-06-06
*/
public interface ScheduleStrategy {
Boolean enable();
Boolean show();
String name();
String comment();
JobDetail job();
Trigger trigger();
}

View File

@@ -0,0 +1,88 @@
package com.lanyuanxiaoyao.service.scheduler.strategy;
import cn.hutool.core.util.RandomUtil;
import java.time.Instant;
import java.util.Date;
import org.quartz.*;
import static com.eshore.odcp.hudi.connector.Constants.SECOND;
/**
* @author ZhangJiacheng
* @date 2023-06-06
*/
public abstract class ScheduleStrategyImpl implements ScheduleStrategy {
protected final String name;
protected final String comment;
protected ScheduleStrategyImpl(String name, String comment) {
this.name = name;
this.comment = comment;
}
public static ScheduleStrategy simple(String name, String comment, Class<? extends Job> job, String cron) {
return simple(true, name, comment, job, cron);
}
public static ScheduleStrategy simple(Boolean show, String name, String comment, Class<? extends Job> job, String cron) {
return new ScheduleStrategyImpl(name, comment) {
@Override
public Boolean show() {
return show;
}
@Override
public JobDetail job() {
return generateJob(job);
}
@Override
public Trigger trigger() {
return generateTrigger(cron);
}
};
}
@Override
public Boolean enable() {
return true;
}
@Override
public String name() {
return name;
}
@Override
public String comment() {
return comment;
}
@Override
public String toString() {
return "BaseScheduleStrategy{" +
"name='" + name + '\'' +
", comment='" + comment + '\'' +
'}';
}
protected JobDetail generateJob(Class<? extends Job> job) {
return JobBuilder.newJob()
.withIdentity(name + "_job")
.withDescription(comment)
.ofType(job)
.build();
}
protected Trigger generateTrigger(String cron) {
return TriggerBuilder.newTrigger()
.withIdentity(name + "trigger")
.withDescription(cron)
.withSchedule(
CronScheduleBuilder
.cronSchedule(cron)
.withMisfireHandlingInstructionFireAndProceed())
.startAt(Date.from(Instant.now().plusMillis(SECOND * RandomUtil.randomInt(30, 60))))
.build();
}
}

View File

@@ -0,0 +1,72 @@
package com.lanyuanxiaoyao.service.scheduler.strategy;
import cn.hutool.core.util.ObjectUtil;
/**
* @author ZhangJiacheng
* @date 2023-06-26
*/
public class ScheduleStrategyVO {
private final String name;
private final String comment;
private final String job;
private final String trigger;
private final Long lastFireTime;
private final Long nextFireTime;
public ScheduleStrategyVO(String name, String comment, String job, String trigger, Long lastFireTime, Long nextFireTime) {
this.name = name;
this.comment = comment;
this.job = job;
this.trigger = trigger;
this.lastFireTime = lastFireTime;
this.nextFireTime = nextFireTime;
}
public ScheduleStrategyVO(ScheduleStrategy strategy) {
this(
strategy.name(),
strategy.comment(),
strategy.job().getDescription(),
strategy.trigger().getDescription(),
ObjectUtil.isNull(strategy.trigger().getPreviousFireTime()) ? 0 : strategy.trigger().getPreviousFireTime().getTime(),
ObjectUtil.isNull(strategy.trigger().getNextFireTime()) ? 0 : strategy.trigger().getNextFireTime().getTime()
);
}
public String getName() {
return name;
}
public String getComment() {
return comment;
}
public String getJob() {
return job;
}
public String getTrigger() {
return trigger;
}
public Long getLastFireTime() {
return lastFireTime;
}
public Long getNextFireTime() {
return nextFireTime;
}
@Override
public String toString() {
return "ScheduleStrategyVO{" +
"name='" + name + '\'' +
", comment='" + comment + '\'' +
", job='" + job + '\'' +
", trigger='" + trigger + '\'' +
", lastFireTime=" + lastFireTime +
", nextFireTime=" + nextFireTime +
'}';
}
}

View File

@@ -0,0 +1,149 @@
package com.lanyuanxiaoyao.service.scheduler.utils;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.Constants;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.eshore.odcp.hudi.connector.entity.compaction.ScheduleJob;
import com.eshore.odcp.hudi.connector.utils.TableMetaHelper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.loki4j.slf4j.marker.LabelMarker;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import java.util.Comparator;
import java.util.function.Predicate;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.map.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.springframework.cloud.client.discovery.DiscoveryClient;
/**
* 压缩调度工具
*
* @author lanyuanxiaoyao
* @date 2023-05-12
*/
public class ScheduleHelper {
private static final Logger logger = LoggerFactory.getLogger(ScheduleHelper.class);
private static Marker makeMarker(Long flinkJobId, String alias) {
return LabelMarker.of(() -> Maps.mutable.of(Constants.LOG_FLINK_JOB_ID, flinkJobId.toString(), Constants.LOG_ALIAS, alias));
}
public static void schedule(
DiscoveryClient discoveryClient,
InfoService infoService,
HudiService hudiService,
ObjectMapper mapper,
Predicate<TableMeta> predicate,
String comment,
ImmutableMap<String, String> metadata
) {
String batchId = IdUtil.nanoId(10);
infoService.tableMetaList()
// 只调度 MOR 表
.select(meta -> StrUtil.equals(Constants.MOR, meta.getHudi().getTargetTableType()))
.asParallel(ExecutorProvider.EXECUTORS, 1)
.select(predicate::test)
// 没有 Hudi 表的过滤掉
.select(meta -> {
try {
return hudiService.existsHudiTable(meta.getJob().getId(), meta.getAlias());
} catch (Throwable throwable) {
logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get hudi status failure", throwable);
}
return false;
})
// 没有压缩计划的过滤掉
.select(meta -> {
try {
return hudiService.existsCompactionPlan(meta.getJob().getId(), meta.getAlias());
} catch (Throwable throwable) {
logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get compaction status failure", throwable);
}
return false;
})
// 拒绝不压缩标志的任务
.reject(meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT))
.collect(meta -> {
long compactionDuration = 0L;
try {
// 计算压缩耗时
SyncState syncState = infoService.syncStateDetail(meta.getJob().getId(), meta.getAlias());
if (ObjectUtil.isNotNull(syncState)
&& ObjectUtil.isNotNull(syncState.getCompactionFinishTime())
&& ObjectUtil.isNotNull(syncState.getCompactionStartTime())) {
compactionDuration = syncState.getCompactionFinishTime() - syncState.getCompactionStartTime();
}
} catch (Throwable e) {
logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get sync state failure for {} {}", meta.getJob().getId(), meta.getAlias());
}
return new TableMetaWrapper(meta, compactionDuration);
})
.toSortedList(
Comparator
// 比较 Bucket 数,数量大的在前面
.comparing(TableMetaWrapper::getBucketIndexNumber, Comparator.reverseOrder())
// 比较压缩耗时,压缩耗时长的在前面
.thenComparing(TableMetaWrapper::getCompactionDuration, Comparator.reverseOrder()))
.collect(meta -> new QueueItem<>(
StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias()),
metadata.toMap(),
meta.getPriority(),
ScheduleJob.builder()
.id(IdUtil.nanoId(10))
.flinkJobId(meta.getFlinkJobId())
.alias(meta.getAlias())
.batch(batchId)
.status(Constants.COMPACTION_STATUS_SCHEDULE)
.comment(comment)
.build()
))
.forEach(item -> {
// 将任务放入预处理队列等待处理
QueueUtil.add(discoveryClient, mapper, Constants.COMPACTION_QUEUE_PRE, item);
if (ObjectUtil.isNotNull(item.getData())) {
logger.info(makeMarker(item.getData().getFlinkJobId(), item.getData().getAlias()), "Schedule {}", item);
} else {
logger.warn("Item data is null, {}", item);
}
});
}
private static final class TableMetaWrapper {
private final TableMeta tableMeta;
private final Long compactionDuration;
private TableMetaWrapper(TableMeta tableMeta, Long compactionDuration) {
this.tableMeta = tableMeta;
this.compactionDuration = compactionDuration;
}
public Long getFlinkJobId() {
return tableMeta.getJob().getId();
}
public String getAlias() {
return tableMeta.getAlias();
}
public Integer getBucketIndexNumber() {
return tableMeta.getHudi().getBucketIndexNumber();
}
public Integer getPriority() {
return tableMeta.getPriority();
}
public Long getCompactionDuration() {
return compactionDuration;
}
}
}

View File

@@ -0,0 +1,5 @@
spring:
application:
name: service-scheduler
profiles:
include: random-port,common,discovery,metrics,forest

View File

@@ -0,0 +1,52 @@
<configuration>
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<springProperty scope="context" name="LOKI_PUSH_URL" source="loki.url"/>
<springProperty scope="context" name="LOGGING_PARENT" source="logging.parent"/>
<springProperty scope="context" name="APP_NAME" source="spring.application.name"/>
<appender name="Loki" class="com.github.loki4j.logback.Loki4jAppender">
<metricsEnabled>true</metricsEnabled>
<http class="com.github.loki4j.logback.ApacheHttpSender">
<url>${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push}</url>
</http>
<format>
<label>
<pattern>app=${APP_NAME:-none},host=${HOSTNAME:-none},level=%level,job_id=%mdc{LOG_JOB_ID_LABEL:-none},flink_job_id=%mdc{LOG_FLINK_JOB_ID_LABEL:-none},alias=%mdc{LOG_ALIAS_LABEL:-none}</pattern>
<readMarkers>true</readMarkers>
</label>
<message>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx</pattern>
</message>
<sortByTime>true</sortByTime>
</format>
</appender>
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %clr(%5p) %clr([${HOSTNAME}]){yellow} %clr([%t]){magenta} %clr(%logger{40}){cyan} #@# %m%n%wEx</pattern>
</encoder>
</appender>
<appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOGGING_PARENT:-.}/${APP_NAME:-run}.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz</fileNamePattern>
<MaxHistory>7</MaxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx</pattern>
</encoder>
</appender>
<logger name="com.zaxxer.hikari" level="ERROR"/>
<logger name="com.netflix.discovery.shared.resolver.aws.ConfigClusterResolver" level="WARN"/>
<root level="INFO">
<appender-ref ref="Loki"/>
<appender-ref ref="Console"/>
<appender-ref ref="RollingFile"/>
</root>
</configuration>