perf(scheduler): 优化批量压缩调度接口

This commit is contained in:
v-zhangjc9
2024-05-08 18:35:26 +08:00
parent 7f884a5554
commit 103bde5cdc
8 changed files with 41 additions and 9 deletions

View File

@@ -1,8 +1,11 @@
package com.lanyuanxiaoyao.service.forest.service; package com.lanyuanxiaoyao.service.forest.service;
import com.dtflys.forest.annotation.BaseRequest; import com.dtflys.forest.annotation.BaseRequest;
import com.dtflys.forest.annotation.Body;
import com.dtflys.forest.annotation.Get; import com.dtflys.forest.annotation.Get;
import com.dtflys.forest.annotation.Post;
import com.dtflys.forest.annotation.Query; import com.dtflys.forest.annotation.Query;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.schedule.ScheduleStrategy; import com.lanyuanxiaoyao.service.configuration.entity.schedule.ScheduleStrategy;
import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.list.ImmutableList;
@@ -36,6 +39,9 @@ public interface ScheduleService {
@Get("/table") @Get("/table")
void scheduleTableRecommend(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("recommend") String recommendCluster); void scheduleTableRecommend(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("recommend") String recommendCluster);
@Post(value = "/tables", contentType = "application/json")
void scheduleTables(@Body ImmutableList<JobIdAndAlias> list);
@Get("/stop_all") @Get("/stop_all")
void stopAll(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("disable_meta") Boolean disableMeta); void stopAll(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("disable_meta") Boolean disableMeta);
} }

View File

@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.LoadingCache;
import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
import com.lanyuanxiaoyao.service.forest.service.HudiService; import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.forest.service.InfoService;
@@ -34,6 +35,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
@@ -141,6 +144,22 @@ public class ScheduleController {
); );
} }
@PostMapping("tables")
public void scheduleTables(@RequestBody ImmutableList<JobIdAndAlias> list) {
if (list.isEmpty()) {
return;
}
ImmutableList<String> keys = list.collect(item -> StrUtil.format("{}-{}", item.getId(), item.getAlias()));
ScheduleHelper.schedule(
discoveryClient,
infoService,
hudiService,
mapper,
meta -> keys.contains(StrUtil.format("{}-{}", meta.getJob().getId(), meta.getAlias())),
"Schedule manually"
);
}
@GetMapping("stop_all") @GetMapping("stop_all")
public void stopTheWorld( public void stopTheWorld(
@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("flink_job_id") Long flinkJobId,

View File

@@ -45,8 +45,7 @@ public class DailyScheduleJob extends BaseScheduleJob {
hudiService, hudiService,
mapper, mapper,
meta -> true, meta -> true,
comment, comment
Maps.immutable.empty()
); );
} }

View File

@@ -45,8 +45,7 @@ public class FocusScheduleJob extends BaseScheduleJob {
hudiService, hudiService,
mapper, mapper,
meta -> meta.getPriority() >= 10000, meta -> meta.getPriority() >= 10000,
comment, comment
Maps.immutable.empty()
); );
} }

View File

@@ -49,8 +49,7 @@ public class FocusUnVersionUpdateScheduleJob extends BaseScheduleJob {
mapper, mapper,
meta -> meta.getPriority() >= 10000 meta -> meta.getPriority() >= 10000
&& unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getJob().getId(), meta.getAlias())), && unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getJob().getId(), meta.getAlias())),
comment, comment
Maps.immutable.empty()
); );
} }

View File

@@ -47,8 +47,7 @@ public class OdsFocusScheduleJob extends BaseScheduleJob {
hudiService, hudiService,
mapper, mapper,
meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_ODS_FOCUS), meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_ODS_FOCUS),
comment, comment
Maps.immutable.empty()
); );
} }

View File

@@ -37,6 +37,17 @@ public class ScheduleHelper {
return LabelMarker.of(() -> Maps.mutable.of(Constants.LOG_FLINK_JOB_ID, flinkJobId.toString(), Constants.LOG_ALIAS, alias)); return LabelMarker.of(() -> Maps.mutable.of(Constants.LOG_FLINK_JOB_ID, flinkJobId.toString(), Constants.LOG_ALIAS, alias));
} }
public static void schedule(
DiscoveryClient discoveryClient,
InfoService infoService,
HudiService hudiService,
ObjectMapper mapper,
Predicate<TableMeta> predicate,
String comment
) {
schedule(discoveryClient, infoService, hudiService, mapper, predicate, comment, Maps.immutable.empty());
}
public static void schedule( public static void schedule(
DiscoveryClient discoveryClient, DiscoveryClient discoveryClient,
InfoService infoService, InfoService infoService,

View File

@@ -47,7 +47,7 @@ public class ScheduleController extends BaseController {
for (JobIdAndAlias jobIdAndAlias : jobIdAndAliases) { for (JobIdAndAlias jobIdAndAlias : jobIdAndAliases) {
checkTableExists(infoService, jobIdAndAlias.getId(), jobIdAndAlias.getAlias()); checkTableExists(infoService, jobIdAndAlias.getId(), jobIdAndAlias.getAlias());
} }
jobIdAndAliases.forEach(item -> scheduleService.scheduleTable(item.getId(), item.getAlias())); scheduleService.scheduleTables(jobIdAndAliases);
return AmisResponse.responseSuccess(); return AmisResponse.responseSuccess();
} }