feat(scheduler): crm重点表独占a4调度

This commit is contained in:
v-zhangjc9
2024-07-10 18:20:29 +08:00
parent aaa243d626
commit cb4a21ab59
5 changed files with 39 additions and 25 deletions

View File

@@ -138,6 +138,7 @@ public interface Constants {
String METRICS_LABEL_TOPIC = "topic"; String METRICS_LABEL_TOPIC = "topic";
String METRICS_LABEL_BATCH_ID = "batch_id"; String METRICS_LABEL_BATCH_ID = "batch_id";
String METRICS_LABEL_ALIAS = "alias"; String METRICS_LABEL_ALIAS = "alias";
String METRICS_LABEL_TAGS = "tags";
String METRICS_LABEL_APPLICATION_ID = "application_id"; String METRICS_LABEL_APPLICATION_ID = "application_id";
String METRICS_STATUS_RUNNING = "running"; String METRICS_STATUS_RUNNING = "running";

View File

@@ -32,7 +32,7 @@ public class ScheduleStrategyProvider {
// ODS重点表调度 // ODS重点表调度
ScheduleStrategyImpl.simple("ods_focus_schedule", "ODS 重点表调度", OdsFocusScheduleJob.class, "0 30 23 * * ?"), ScheduleStrategyImpl.simple("ods_focus_schedule", "ODS 重点表调度", OdsFocusScheduleJob.class, "0 30 23 * * ?"),
// CRM重点表调度 // CRM重点表调度
ScheduleStrategyImpl.simple("crm_focus_schedule", "CRM 重点表补充调度", CrmFocusScheduleJob.class, "0 0/10 2,3,4,5,6 * * ?"), ScheduleStrategyImpl.simple("crm_focus_schedule", "CRM 重点表补充调度", CrmFocusScheduleJob.class, "0 0/15 2,3,4,5,6 * * ?"),
// 忙时调度 // 忙时调度
ScheduleStrategyImpl.simple("focus_schedule", "重点表跨天调度", FocusUnVersionUpdateScheduleJob.class, "0 0,10,20,40 0,1 * * ?"), ScheduleStrategyImpl.simple("focus_schedule", "重点表跨天调度", FocusUnVersionUpdateScheduleJob.class, "0 0,10,20,40 0,1 * * ?"),
ScheduleStrategyImpl.simple(false, "remove_scheduled", "跨天调度时及时删除已完成跨天的任务", RemoveScheduledJob.class, "0 * 0,1 * * ?") ScheduleStrategyImpl.simple(false, "remove_scheduled", "跨天调度时及时删除已完成跨天的任务", RemoveScheduledJob.class, "0 * 0,1 * * ?")

View File

@@ -43,16 +43,13 @@ public class CrmFocusScheduleJob extends BaseScheduleJob {
public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) { public static void schedule(DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ObjectMapper mapper, String comment) {
logger.info("Crm focus schedule"); logger.info("Crm focus schedule");
ImmutableList<String> unUpdateVersionTableIds = infoService.nonUpdatedVersionTables();
ScheduleHelper.schedule( ScheduleHelper.schedule(
discoveryClient, discoveryClient,
infoService, infoService,
hudiService, hudiService,
mapper, mapper,
meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS) meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS),
&& unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias())), comment
comment,
Maps.immutable.of(Constants.SCHEDULE_FORCE, Constants.CLUSTER_A4)
); );
} }

View File

@@ -3,7 +3,6 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.distribute.cluster;
import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.forest.service.YarnService; import com.lanyuanxiaoyao.service.forest.service.YarnService;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.AvailableStrategy;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.DatetimeLimit;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.QueueSizeLimit;
import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit; import com.lanyuanxiaoyao.service.scheduler.quartz.distribute.strategy.YarnQueueUsedLimit;
import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.DiscoveryClient;
@@ -24,8 +23,7 @@ public class A4Cluster extends Cluster {
Constants.COMPACTION_QUEUE_A4, Constants.COMPACTION_QUEUE_A4,
AvailableStrategy.and( AvailableStrategy.and(
new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_A4, 10), new QueueSizeLimit(client, Constants.COMPACTION_QUEUE_A4, 10),
new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_A4, "ten_iap.datalake", 0.8), new YarnQueueUsedLimit(yarnService, Constants.CLUSTER_A4, "ten_iap.datalake", 0.8)
new DatetimeLimit(false, "* * 2,3,4,5,6 * * ? *")
) )
); );
} }

View File

@@ -19,6 +19,7 @@ import java.util.Comparator;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.eclipse.collections.api.factory.Maps; import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.map.ImmutableMap; import org.eclipse.collections.api.map.ImmutableMap;
import org.eclipse.collections.api.map.MutableMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.Marker; import org.slf4j.Marker;
@@ -106,12 +107,24 @@ public class ScheduleHelper {
.comparing(TableMetaWrapper::getBucketNumber, Comparator.reverseOrder()) .comparing(TableMetaWrapper::getBucketNumber, Comparator.reverseOrder())
// 比较压缩耗时,压缩耗时长的在前面 // 比较压缩耗时,压缩耗时长的在前面
.thenComparing(TableMetaWrapper::getCompactionDuration, Comparator.reverseOrder())) .thenComparing(TableMetaWrapper::getCompactionDuration, Comparator.reverseOrder()))
.collect(meta -> new QueueItem<>( .collect(meta -> {
StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias()), MutableMap<String, String> finalMetadata = metadata
metadata
.toMap() .toMap()
.withKeyValue(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getFlinkJobId().toString()) .withKeyValue(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getFlinkJobId().toString())
.withKeyValue(Constants.METRICS_LABEL_ALIAS, meta.getAlias()), .withKeyValue(Constants.METRICS_LABEL_ALIAS, meta.getAlias())
.withKeyValue(Constants.METRICS_LABEL_TAGS, meta.getTags());
// 统一在这里覆盖特定请求
// CRM重点表独占A4集群
if (TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS)) {
finalMetadata.put(Constants.SCHEDULE_FORCE, Constants.CLUSTER_A4);
} else {
finalMetadata.put(Constants.SCHEDULE_ESCAPE, Constants.CLUSTER_A4);
}
return new QueueItem<>(
StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias()),
finalMetadata,
meta.getPriority(), meta.getPriority(),
ScheduleJob.builder() ScheduleJob.builder()
.id(IdUtil.nanoId(10)) .id(IdUtil.nanoId(10))
@@ -121,7 +134,8 @@ public class ScheduleHelper {
.status(Constants.COMPACTION_STATUS_SCHEDULE) .status(Constants.COMPACTION_STATUS_SCHEDULE)
.comment(comment) .comment(comment)
.build() .build()
)) );
})
.forEach(item -> { .forEach(item -> {
// 将任务放入预处理队列等待处理 // 将任务放入预处理队列等待处理
QueueUtil.add(discoveryClient, mapper, Constants.COMPACTION_QUEUE_PRE, item); QueueUtil.add(discoveryClient, mapper, Constants.COMPACTION_QUEUE_PRE, item);
@@ -150,6 +164,10 @@ public class ScheduleHelper {
return meta.getAlias(); return meta.getAlias();
} }
public String getTags() {
return meta.getTags();
}
public Integer getBucketNumber() { public Integer getBucketNumber() {
return meta.getBucketNumber(); return meta.getBucketNumber();
} }