diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index f43380a..a34134a 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -224,6 +224,7 @@ public interface Constants { String SCHEDULE_RECOMMEND = "schedule_recommend"; String SCHEDULE_FORCE = "schedule_force"; + String SCHEDULE_ESCAPE = "schedule_escape"; BiFunction FIELD_COVERT = (tableMeta, field) -> { if (TableMeta.SourceType.TELEPG.equals(tableMeta.getSourceType())) { diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/configuration/ScheduleStrategyProvider.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/configuration/ScheduleStrategyProvider.java index 7b51f71..7897e37 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/configuration/ScheduleStrategyProvider.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/configuration/ScheduleStrategyProvider.java @@ -32,7 +32,7 @@ public class ScheduleStrategyProvider { // ODS重点表调度 ScheduleStrategyImpl.simple("ods_focus_schedule", "ODS 重点表调度", OdsFocusScheduleJob.class, "0 30 23 * * ?"), // CRM重点表调度 - ScheduleStrategyImpl.simple("crm_focus_schedule", "CRM 重点表补充调度", CrmFocusScheduleJob.class, "0 15 3,4 * * ?"), + ScheduleStrategyImpl.simple("crm_focus_schedule", "CRM 重点表补充调度", CrmFocusScheduleJob.class, "0 0/10 2,3,4,5,6 * * ?"), // 忙时调度 ScheduleStrategyImpl.simple("focus_schedule", "重点表跨天调度", FocusUnVersionUpdateScheduleJob.class, "0 0,10,20,40 0,1 * * ?"), ScheduleStrategyImpl.simple(false, "remove_scheduled", "跨天调度时及时删除已完成跨天的任务", RemoveScheduledJob.class, "0 * 0,1 * * ?") diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java index 5eb2717..72bbcd2 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; +import cn.hutool.core.util.StrUtil; import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; @@ -8,6 +9,8 @@ import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.list.ImmutableList; import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; import org.slf4j.Logger; @@ -16,7 +19,7 @@ import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; /** - * 日常普通调度 + * Crm重点表调度 * * @author ZhangJiacheng * @date 2023-05-11 @@ -40,13 +43,16 @@ public class CrmFocusScheduleJob extends BaseScheduleJob { public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) { logger.info("Crm focus schedule"); + ImmutableList unUpdateVersionTableIds = infoService.nonUpdatedVersionTables(); ScheduleHelper.schedule( discoveryClient, infoService, hudiService, mapper, - meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS), - comment + meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS) + && unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias())), + comment, + Maps.immutable.of(Constants.SCHEDULE_FORCE, Constants.CLUSTER_A4) ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DistributeScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DistributeScheduleJob.java index 0861c96..6f67b02 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DistributeScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/DistributeScheduleJob.java @@ -67,14 +67,18 @@ public class DistributeScheduleJob extends BaseScheduleJob { if (metadata.containsKey(Constants.SCHEDULE_RECOMMEND)) { String recommendCluster = metadata.get(Constants.SCHEDULE_RECOMMEND); Optional cluster = clusters.select(s -> StrUtil.equals(recommendCluster, s.cluster())).getFirstOptional(); - if (cluster.isPresent() && cluster.get().available()) { + if (cluster.isPresent() && cluster.get().available(metadata)) { return cluster.get().queue(); } else { logger.warn(StrUtil.format("{} cluster not found or busy")); } } for (Cluster cluster : clusters) { - if (cluster.available()) { + if (cluster.available(metadata)) { + String escapeCluster = metadata.getOrDefault(Constants.SCHEDULE_ESCAPE, null); + if (StrUtil.isNotBlank(escapeCluster) && StrUtil.equals(escapeCluster, cluster.cluster())) { + continue; + } return cluster.queue(); } } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/A4Cluster.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/A4Cluster.java index 0cd717d..3aad938 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/A4Cluster.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/A4Cluster.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.forest.service.YarnService; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy; +import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.DatetimeLimit; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit; import org.springframework.cloud.client.discovery.DiscoveryClient; @@ -23,7 +24,8 @@ public class A4Cluster extends Cluster { Constants.COMPACTION_QUEUE_A4, AvailableStrategy.and( new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_A4, 10), - new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_A4, "ten_iap.datalake", 0.8) + new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_A4, "ten_iap.datalake", 0.8), + new DatetimeLimit(false, "* * 2,3,4,5,6 * * ? *") ) ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B5Cluster.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B5Cluster.java index ffd06c6..c5204eb 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B5Cluster.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/B5Cluster.java @@ -22,7 +22,7 @@ public class B5Cluster extends Cluster { new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_B5, 10), new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_B5, "ten_iap.datalake", 0.9) ) */ - () -> false + metadata -> false ); } } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/Cluster.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/Cluster.java index 1b2ed44..6ba16c9 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/Cluster.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/Cluster.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy; +import org.eclipse.collections.api.map.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,12 +38,12 @@ public class Cluster { return queue; } - public boolean available() { + public boolean available(ImmutableMap metadata) { try { if (ObjectUtil.isNull(availableStrategy)) { return true; } - return availableStrategy.available(); + return availableStrategy.available(metadata); } catch (Throwable throwable) { logger.error(StrUtil.format("Check cluster {} available fail", this.cluster), throwable); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/AvailableStrategy.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/AvailableStrategy.java index 6427ee8..46c9478 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/AvailableStrategy.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/AvailableStrategy.java @@ -2,6 +2,7 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; /** * 判断集群是否有可用资源的策略 @@ -22,7 +23,7 @@ public interface AvailableStrategy { return new WhetherAvailableStrategy(condition, trueOption, falseOption); } - boolean available(); + boolean available(ImmutableMap metadata); abstract class BaseAvailableStrategy implements AvailableStrategy { protected final ImmutableList strategies; @@ -38,8 +39,8 @@ public interface AvailableStrategy { } @Override - public boolean available() { - return strategies.allSatisfy(AvailableStrategy::available); + public boolean available(ImmutableMap metadata) { + return strategies.allSatisfy(strategy -> strategy.available(metadata)); } } @@ -49,8 +50,8 @@ public interface AvailableStrategy { } @Override - public boolean available() { - return strategies.anySatisfy(AvailableStrategy::available); + public boolean available(ImmutableMap metadata) { + return strategies.anySatisfy(strategy -> strategy.available(metadata)); } } @@ -66,8 +67,8 @@ public interface AvailableStrategy { } @Override - public boolean available() { - return condition.available() ? trueOption.available() : falseOption.available(); + public boolean available(ImmutableMap metadata) { + return condition.available(metadata) ? trueOption.available(metadata) : falseOption.available(metadata); } } } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/DatetimeLimit.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/DatetimeLimit.java index 6a6cc10..ba4d101 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/DatetimeLimit.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/DatetimeLimit.java @@ -6,6 +6,7 @@ import com.cronutils.model.time.ExecutionTime; import com.cronutils.parser.CronParser; import java.time.ZoneId; import java.time.ZonedDateTime; +import org.eclipse.collections.api.map.ImmutableMap; /** * 时间限制 @@ -25,7 +26,7 @@ public class DatetimeLimit implements AvailableStrategy { } @Override - public boolean available() { + public boolean available(ImmutableMap metadata) { return runInPeriod == execution.isMatch(ZonedDateTime.now().withNano(0)); } } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/MetadataLimit.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/MetadataLimit.java new file mode 100644 index 0000000..1bf9495 --- /dev/null +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/MetadataLimit.java @@ -0,0 +1,22 @@ +package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy; + +import java.util.function.Predicate; +import org.eclipse.collections.api.map.ImmutableMap; + +/** + * 通过Metadata信息限制调度 + * + * @author lanyuanxiaoyao + */ +public class MetadataLimit implements AvailableStrategy { + private final Predicate> predicate; + + public MetadataLimit(Predicate> predicate) { + this.predicate = predicate; + } + + @Override + public boolean available(ImmutableMap metadata) { + return predicate.test(metadata); + } +} diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/QueueSizeLimit.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/QueueSizeLimit.java index d87c89f..24e0557 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/QueueSizeLimit.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/QueueSizeLimit.java @@ -1,6 +1,7 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy; import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; +import org.eclipse.collections.api.map.ImmutableMap; import org.springframework.cloud.client.discovery.DiscoveryClient; /** @@ -21,7 +22,7 @@ public class QueueSizeLimit implements AvailableStrategy { } @Override - public boolean available() { + public boolean available(ImmutableMap metadata) { return QueueUtil.size(discoveryClient, queue) < limit; } } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/YarnQueueUsedLimit.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/YarnQueueUsedLimit.java index 73ed210..f7cc8fa 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/YarnQueueUsedLimit.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/strategy/YarnQueueUsedLimit.java @@ -2,6 +2,7 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; import com.lanyuanxiaoyao.service.forest.service.YarnService; +import org.eclipse.collections.api.map.ImmutableMap; /** * Yarn 队列剩余资源限制 @@ -28,7 +29,7 @@ public class YarnQueueUsedLimit implements AvailableStrategy { } @Override - public boolean available() { + public boolean available(ImmutableMap metadata) { return queueUsed(cluster, queue) < limit; } } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java index f611085..1be807b 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java @@ -108,7 +108,10 @@ public class ScheduleHelper { .thenComparing(TableMetaWrapper::getCompactionDuration, Comparator.reverseOrder())) .collect(meta -> new QueueItem<>( StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias()), - metadata.toMap(), + metadata + .toMap() + .withKeyValue(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getFlinkJobId().toString()) + .withKeyValue(Constants.METRICS_LABEL_ALIAS, meta.getAlias()), meta.getPriority(), ScheduleJob.builder() .id(IdUtil.nanoId(10)) diff --git a/service-scheduler/src/test/java/com/lanyuanxiaoyao/service/scheduler/TestCache.java b/service-scheduler/src/test/java/com/lanyuanxiaoyao/service/scheduler/TestCache.java new file mode 100644 index 0000000..be643ea --- /dev/null +++ b/service-scheduler/src/test/java/com/lanyuanxiaoyao/service/scheduler/TestCache.java @@ -0,0 +1,36 @@ +package com.lanyuanxiaoyao.service.scheduler; + +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * @author lanyuanxiaoyao + */ +public class TestCache { + public static void main(String[] args) { + CacheLoaderImpl loader = new CacheLoaderImpl(); + LoadingCache cache = Caffeine.newBuilder() + .build(loader); + loader.setBindCache(cache); + System.out.println(cache.asMap()); + System.out.println(cache.get("one")); + System.out.println(cache.asMap()); + } + + public static class CacheLoaderImpl implements CacheLoader { + private LoadingCache bindCache; + + public void setBindCache(LoadingCache bindCache) { + this.bindCache = bindCache; + } + + @Override + public @Nullable Boolean load(@NonNull String key) throws Exception { + bindCache.put("two", false); + return true; + } + } +} diff --git a/service-scheduler/src/test/java/com/lanyuanxiaoyao/service/scheduler/TestStrategy.java b/service-scheduler/src/test/java/com/lanyuanxiaoyao/service/scheduler/TestStrategy.java index 26c63c6..2107ce9 100644 --- a/service-scheduler/src/test/java/com/lanyuanxiaoyao/service/scheduler/TestStrategy.java +++ b/service-scheduler/src/test/java/com/lanyuanxiaoyao/service/scheduler/TestStrategy.java @@ -2,6 +2,7 @@ package com.lanyuanxiaoyao.service.scheduler; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.DatetimeLimit; +import org.eclipse.collections.api.factory.Maps; /** * @author lanyuanxiaoyao @@ -10,8 +11,8 @@ import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.DatetimeL public class TestStrategy { public static void main(String[] args) { AvailableStrategy strategy = AvailableStrategy.and( - new DatetimeLimit(false, "* * 7-19 28-30 4 ? 2024") + new DatetimeLimit(false, "* * 2,3,4,5,6 * * ? *") ); - System.out.println(strategy.available()); + System.out.println(strategy.available(Maps.immutable.empty())); } }