From 053a9222cdc978662b9175c9e4d5e44e3c0559a7 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Fri, 26 Apr 2024 10:16:11 +0800 Subject: [PATCH] =?UTF-8?q?feat(launcher):=20=E5=A2=9E=E5=8A=A0=E5=8D=95?= =?UTF-8?q?=E7=8B=AC=E6=8C=87=E5=AE=9A=E9=9B=86=E7=BE=A4=E8=BF=9B=E8=A1=8C?= =?UTF-8?q?=E6=89=8B=E5=8A=A8=E5=8E=8B=E7=BC=A9=E7=9A=84=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../command/commands/CompactionCommand.java | 45 ++++- .../service/launcher/LauncherService.java | 3 + .../service/launcher/LaunchersService.java | 5 + .../controller/CompactionController.java | 15 +- .../compaction/service/CompactionService.java | 162 +++++++++--------- 5 files changed, 143 insertions(+), 87 deletions(-) diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/CompactionCommand.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/CompactionCommand.java index d00a2ba..bdea9a1 100644 --- a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/CompactionCommand.java +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/CompactionCommand.java @@ -1,12 +1,15 @@ package com.lanyuanxiaoyao.service.command.commands; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.command.utils.CommandLineUtils; import com.lanyuanxiaoyao.service.command.utils.TableUtils; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.TableMeta; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnClusters; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.forest.service.ScheduleService; +import com.lanyuanxiaoyao.service.forest.service.launcher.LaunchersService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.shell.standard.ShellComponent; @@ -27,10 +30,14 @@ public class CompactionCommand extends AbstractUtilShellComponent { private final InfoService infoService; private final ScheduleService scheduleService; + private final YarnClusters yarnClusters; + private final LaunchersService launchersService; - public CompactionCommand(InfoService infoService, ScheduleService scheduleService) { + public CompactionCommand(InfoService infoService, ScheduleService scheduleService, YarnClusters yarnClusters, LaunchersService launchersService) { this.infoService = infoService; this.scheduleService = scheduleService; + this.yarnClusters = yarnClusters; + this.launchersService = launchersService; } @ShellMethod("启动表压缩任务") @@ -59,4 +66,40 @@ public class CompactionCommand extends AbstractUtilShellComponent { } ); } + + @ShellMethod("启动表压缩任务") + public String compactionDryRun( + @ShellOption( + help = "集群", + defaultValue = "" + ) String cluster, + @ShellOption(help = "Flink job id") Long flinkJobId, + @ShellOption(help = "别名") String alias, + @ShellOption( + help = "Ignore double check", + defaultValue = "false" + ) Boolean ignoreCheck + ) { + if (StrUtil.isBlank(cluster)) { + cluster = yarnClusters.getDefaultCompactionCluster(); + logger.info("Use default compaction cluster: {}", cluster); + } + String targetCluster = cluster; + TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); + return CommandLineUtils.generateResultLines( + () -> { + if (ObjectUtil.isEmpty(meta)) { + return "没有找到指定的表信息"; + } else { + System.out.println(TableUtils.makeTableMeta(meta)); + if (doubleCheck(RUN_CONFIRMATION_MESSAGE, ignoreCheck)) { + launchersService.compactionStart(targetCluster, flinkJobId, alias); + return Constants.OPERATION_DONE; + } else { + return Constants.OPERATION_CANCEL; + } + } + } + ); + } } diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/LauncherService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/LauncherService.java index b99cfa7..8837ab6 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/LauncherService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/LauncherService.java @@ -14,6 +14,9 @@ public interface LauncherService { @Get("/launcher/synchronizer/stop") void syncStop(@Query("flink_job_id") Long flinkJobId); + @Get("/launcher/compaction/start") + void compactionStart(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); + @Get("/launcher/compaction/stop") void compactionStop(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/LaunchersService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/LaunchersService.java index fefe03d..05d6931 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/LaunchersService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/LaunchersService.java @@ -46,6 +46,11 @@ public class LaunchersService { return serviceMap.valuesView().toList().toImmutable(); } + public void compactionStart(String cluster, Long flinkJobId, String alias) { + LauncherService service = getService(cluster); + service.compactionStart(flinkJobId, alias); + } + public void compactionStop(Long flinkJobId, String alias) { for (LauncherService service : getServices()) { service.compactionStop(flinkJobId, alias); diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/controller/CompactionController.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/controller/CompactionController.java index 1a7546b..0a6b6cd 100644 --- a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/controller/CompactionController.java +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/controller/CompactionController.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.launcher.compaction.controller; +import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.common.Constants; @@ -8,6 +9,7 @@ import com.lanyuanxiaoyao.service.common.entity.SyncState; import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.forest.service.ZookeeperService; +import com.lanyuanxiaoyao.service.launcher.compaction.service.CompactionService; import com.lanyuanxiaoyao.service.launcher.configuration.ClusterConfiguration; import java.io.IOException; import java.util.EnumSet; @@ -41,13 +43,15 @@ public class CompactionController { private final ClusterConfiguration clusterConfiguration; private final InfoService infoService; private final ZookeeperService zookeeperService; + private final CompactionService compactionService; private final YarnClient yarnClient; - public CompactionController(DiscoveryClient discoveryClient, ClusterConfiguration clusterConfiguration, InfoService infoService, ZookeeperService zookeeperService) { + public CompactionController(DiscoveryClient discoveryClient, ClusterConfiguration clusterConfiguration, InfoService infoService, ZookeeperService zookeeperService, CompactionService compactionService) { this.discoveryClient = discoveryClient; this.clusterConfiguration = clusterConfiguration; this.infoService = infoService; this.zookeeperService = zookeeperService; + this.compactionService = compactionService; yarnClient = YarnClient.createYarnClient(); yarnClient.init(new Configuration()); @@ -62,6 +66,15 @@ public class CompactionController { } } + @GetMapping("start") + public void start( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias + ) throws Exception { + logger.info("Enter method: start[flinkJobId, alias]. " + "flinkJobId:" + flinkJobId + "," + "alias:" + alias); + compactionService.compact(IdUtil.nanoId(10), flinkJobId, alias); + } + @GetMapping("stop") public void stop( @RequestParam("flink_job_id") Long flinkJobId, diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java index d3fbe5e..8dfd7c1 100644 --- a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java @@ -3,7 +3,6 @@ package com.lanyuanxiaoyao.service.launcher.compaction.service; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.FlinkJob; @@ -99,7 +98,7 @@ public class CompactionService { java.util.concurrent.ExecutorService threadPool = Executors.newWorkStealingPool(5); if (Boolean.FALSE.equals(QueueUtil.isEmpty(discoveryClient, clusterConfiguration.getCompactionQueueName()))) { for (int index = 0; index < 5; index++) { - threadPool.submit(this::compact); + threadPool.submit(() -> compact()); } threadPool.shutdown(); while (!threadPool.isTerminated()) { @@ -124,75 +123,21 @@ public class CompactionService { ScheduleJob job = item.getData(); LogHelper.setMdcFlinkJobAndAlias(job.getFlinkJobId(), job.getAlias()); logger.info("Receive job[{}]({}): {}", item.getTraceId(), item.getCreateTime(), item.getData()); - // 构造任务相关的锁 - String lockPath = NameHelper.compactionLauncherLockPath(job.getFlinkJobId(), job.getAlias()); - InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath); try { - if (lock.acquire(2, TimeUnit.SECONDS)) { - Stat stat = zookeeperClient.checkExists().forPath(NameHelper.compactionRunningLockPath(job.getFlinkJobId(), job.getAlias())); - if (ObjectUtil.isNotNull(stat)) { - logger.info("Job {} {} is running", job.getFlinkJobId(), job.getAlias()); - // 运行中的任务放在持有容器中 - holder.add(item); - // 进入下一轮,由于最外层有一个 finally,所以直接 continue 也会尝试获取新的任务 - continue; - } - FlinkJob flinkJob = infoService.flinkJobDetail(job.getFlinkJobId()); - TableMeta meta = infoService.tableMetaDetail(job.getFlinkJobId(), job.getAlias()); - if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) { - logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias()); - clearHolder(holder); - continue; - } - logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias()); - // 判断是否存在 Hudi 表,提前结束掉 - if (!hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias())) { - logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias()); - clearHolder(holder); - continue; - } - // 获取待压缩的时间点 - ImmutableList selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias()); - if (ObjectUtil.isEmpty(selectedInstants)) { - logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias()); - clearHolder(holder); - continue; - } - logger.info("[{}] [{}] Selected Instants: {}", flinkJob.getId(), meta.getAlias(), selectedInstants.makeString(",")); - // 计算待压缩的文件数 - long count = predictCompactFileCount(meta, selectedInstants); - if (ObjectUtil.isNotNull(count)) { - // 根据待压缩的文件数计算并行度 - long parallelism = predictParallelism(count); - logger.info("[{}] [{}] Predict compact files: {} {}", flinkJob.getId(), meta.getAlias(), count, parallelism); - meta.getHudi().setCompactionTasks((int) parallelism); - } - logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias()); - String applicationId = Failsafe.with(RETRY_POLICY) - .get(() -> executorService.runCompaction( - job.getBatch(), - flinkJob, - meta, - hadoopConfiguration.getKerberosKeytabPath(), - hadoopConfiguration.getKerberosPrincipal(), - selectedInstants.collect(HudiInstant::getTimestamp).makeString(","), - clusterConfiguration.getCluster() - ).toString()); - Failsafe.with(RETRY_POLICY) - .run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId)); - clearHolder(holder); - } else { - logger.warn("Un acquire lock for " + item.getId()); - holder.add(item); - } + compact(job.getBatch(), job.getFlinkJobId(), job.getAlias()); + clearHolder(holder); + } catch (JobCannotRunningException e) { + // 运行中的任务放在持有容器中 + holder.add(item); + // 进入下一轮,由于最外层有一个 finally,所以直接 continue 也会尝试获取新的任务 } catch (Exception e) { - logger.warn(StrUtil.format("[{}] [{}] Try lock something wrong ", job.getFlinkJobId(), job.getAlias()), e); + logger.warn(StrUtil.format("[{}] [{}] Try compaction wrong ", job.getFlinkJobId(), job.getAlias()), e); String failCount = item.getMetadata(Constants.SCHEDULE_JOB_FAIL_COUNT); if (StrUtil.isNotBlank(failCount)) { int fail = Integer.parseInt(failCount); if (fail > 5) { logger.error("Job {} cause unaccepted error", item); - continue; + return; } else { item.getMetadata().put(Constants.SCHEDULE_JOB_FAIL_COUNT, String.valueOf(fail + 1)); } @@ -200,15 +145,6 @@ public class CompactionService { item.getMetadata().put(Constants.SCHEDULE_JOB_FAIL_COUNT, "1"); } QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item); - } finally { - // 无论如何,尝试解锁 - try { - if (lock.isAcquiredInThisProcess()) { - lock.release(); - } - } catch (Exception e) { - logger.error("Release lock failure " + lockPath, e); - } } } else { logger.warn("Schedule job is empty. [{}]({}): {}", item.getTraceId(), item.getCreateTime(), item); @@ -222,6 +158,73 @@ public class CompactionService { clearHolder(holder); } + public void compact(String batch, Long flinkJobId, String alias) throws Exception { + // 构造任务相关的锁 + String lockPath = NameHelper.compactionLauncherLockPath(flinkJobId, alias); + InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath); + try { + if (lock.acquire(2, TimeUnit.SECONDS)) { + Stat stat = zookeeperClient.checkExists().forPath(NameHelper.compactionRunningLockPath(flinkJobId, alias)); + if (ObjectUtil.isNotNull(stat)) { + logger.info("Job {} {} is running", flinkJobId, alias); + throw new JobCannotRunningException(); + } + FlinkJob flinkJob = infoService.flinkJobDetail(flinkJobId); + TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); + if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) { + logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias()); + return; + } + logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias()); + // 判断是否存在 Hudi 表,提前结束掉 + if (!hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias())) { + logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias()); + return; + } + // 获取待压缩的时间点 + ImmutableList selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias()); + if (ObjectUtil.isEmpty(selectedInstants)) { + logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias()); + return; + } + logger.info("[{}] [{}] Selected Instants: {}", flinkJob.getId(), meta.getAlias(), selectedInstants.makeString(",")); + // 计算待压缩的文件数 + long count = predictCompactFileCount(meta, selectedInstants); + if (ObjectUtil.isNotNull(count)) { + // 根据待压缩的文件数计算并行度 + long parallelism = predictParallelism(count); + logger.info("[{}] [{}] Predict compact files: {} {}", flinkJob.getId(), meta.getAlias(), count, parallelism); + meta.getHudi().setCompactionTasks((int) parallelism); + } + logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias()); + String applicationId = Failsafe.with(RETRY_POLICY) + .get(() -> executorService.runCompaction( + batch, + flinkJob, + meta, + hadoopConfiguration.getKerberosKeytabPath(), + hadoopConfiguration.getKerberosPrincipal(), + selectedInstants.collect(HudiInstant::getTimestamp).makeString(","), + clusterConfiguration.getCluster() + ).toString()); + Failsafe.with(RETRY_POLICY) + .run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId)); + } else { + logger.warn("Un acquire lock for " + alias); + throw new JobCannotRunningException(); + } + } finally { + // 无论如何,尝试解锁 + try { + if (lock.isAcquiredInThisProcess()) { + lock.release(); + } + } catch (Exception e) { + logger.error("Release lock failure " + lockPath, e); + } + } + } + private void clearHolder(MutableList> holder) { if (holder.isEmpty()) { return; @@ -259,16 +262,5 @@ public class CompactionService { return Math.toIntExact(parallelism); } - private QueueItem deserialize(String body) { - if (StrUtil.isBlank(body)) { - return null; - } - try { - return mapper.readValue(body, new TypeReference>() { - }); - } catch (Throwable error) { - logger.error("Schedule job parse error", error); - return null; - } - } + private static final class JobCannotRunningException extends Exception {} }