diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index d8d1a95..eddc1b9 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -138,6 +138,7 @@ public interface Constants { String METRICS_LABEL_TOPIC = "topic"; String METRICS_LABEL_BATCH_ID = "batch_id"; String METRICS_LABEL_ALIAS = "alias"; + String METRICS_LABEL_TAGS = "tags"; String METRICS_LABEL_APPLICATION_ID = "application_id"; String METRICS_STATUS_RUNNING = "running"; diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/configuration/ScheduleStrategyProvider.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/configuration/ScheduleStrategyProvider.java index 7897e37..14370c4 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/configuration/ScheduleStrategyProvider.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/configuration/ScheduleStrategyProvider.java @@ -32,7 +32,7 @@ public class ScheduleStrategyProvider { // ODS重点表调度 ScheduleStrategyImpl.simple("ods_focus_schedule", "ODS 重点表调度", OdsFocusScheduleJob.class, "0 30 23 * * ?"), // CRM重点表调度 - ScheduleStrategyImpl.simple("crm_focus_schedule", "CRM 重点表补充调度", CrmFocusScheduleJob.class, "0 0/10 2,3,4,5,6 * * ?"), + ScheduleStrategyImpl.simple("crm_focus_schedule", "CRM 重点表补充调度", CrmFocusScheduleJob.class, "0 0/15 2,3,4,5,6 * * ?"), // 忙时调度 ScheduleStrategyImpl.simple("focus_schedule", "重点表跨天调度", FocusUnVersionUpdateScheduleJob.class, "0 0,10,20,40 0,1 * * ?"), ScheduleStrategyImpl.simple(false, "remove_scheduled", "跨天调度时及时删除已完成跨天的任务", RemoveScheduledJob.class, "0 * 0,1 * * ?") diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java index 72bbcd2..5cb89c1 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java @@ -43,16 +43,13 @@ public class CrmFocusScheduleJob extends BaseScheduleJob { public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) { logger.info("Crm focus schedule"); - ImmutableList unUpdateVersionTableIds = infoService.nonUpdatedVersionTables(); ScheduleHelper.schedule( discoveryClient, infoService, hudiService, mapper, - meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS) - && unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias())), - comment, - Maps.immutable.of(Constants.SCHEDULE_FORCE, Constants.CLUSTER_A4) + meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS), + comment ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/A4Cluster.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/A4Cluster.java index 3aad938..0cd717d 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/A4Cluster.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/distribute/cluster/A4Cluster.java @@ -3,7 +3,6 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.forest.service.YarnService; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy; -import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.DatetimeLimit; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit; import org.springframework.cloud.client.discovery.DiscoveryClient; @@ -24,8 +23,7 @@ public class A4Cluster extends Cluster { Constants.COMPACTION_QUEUE_A4, AvailableStrategy.and( new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_A4, 10), - new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_A4, "ten_iap.datalake", 0.8), - new DatetimeLimit(false, "* * 2,3,4,5,6 * * ? *") + new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_A4, "ten_iap.datalake", 0.8) ) ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java index 1be807b..29355aa 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java @@ -19,6 +19,7 @@ import java.util.Comparator; import java.util.function.Predicate; import org.eclipse.collections.api.factory.Maps; import org.eclipse.collections.api.map.ImmutableMap; +import org.eclipse.collections.api.map.MutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -106,22 +107,35 @@ public class ScheduleHelper { .comparing(TableMetaWrapper::getBucketNumber, Comparator.reverseOrder()) // 比较压缩耗时,压缩耗时长的在前面 .thenComparing(TableMetaWrapper::getCompactionDuration, Comparator.reverseOrder())) - .collect(meta -> new QueueItem<>( - StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias()), - metadata - .toMap() - .withKeyValue(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getFlinkJobId().toString()) - .withKeyValue(Constants.METRICS_LABEL_ALIAS, meta.getAlias()), - meta.getPriority(), - ScheduleJob.builder() - .id(IdUtil.nanoId(10)) - .flinkJobId(meta.getFlinkJobId()) - .alias(meta.getAlias()) - .batch(batchId) - .status(Constants.COMPACTION_STATUS_SCHEDULE) - .comment(comment) - .build() - )) + .collect(meta -> { + MutableMap finalMetadata = metadata + .toMap() + .withKeyValue(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getFlinkJobId().toString()) + .withKeyValue(Constants.METRICS_LABEL_ALIAS, meta.getAlias()) + .withKeyValue(Constants.METRICS_LABEL_TAGS, meta.getTags()); + + // 统一在这里覆盖特定请求 + // CRM重点表独占A4集群 + if (TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS)) { + finalMetadata.put(Constants.SCHEDULE_FORCE, Constants.CLUSTER_A4); + } else { + finalMetadata.put(Constants.SCHEDULE_ESCAPE, Constants.CLUSTER_A4); + } + + return new QueueItem<>( + StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias()), + finalMetadata, + meta.getPriority(), + ScheduleJob.builder() + .id(IdUtil.nanoId(10)) + .flinkJobId(meta.getFlinkJobId()) + .alias(meta.getAlias()) + .batch(batchId) + .status(Constants.COMPACTION_STATUS_SCHEDULE) + .comment(comment) + .build() + ); + }) .forEach(item -> { // 将任务放入预处理队列等待处理 QueueUtil.add(discoveryClient, mapper, Constants.COMPACTION_QUEUE_PRE, item); @@ -150,6 +164,10 @@ public class ScheduleHelper { return meta.getAlias(); } + public String getTags() { + return meta.getTags(); + } + public Integer getBucketNumber() { return meta.getBucketNumber(); }