feat(launcher): 增加单独指定集群进行手动压缩的接口

This commit is contained in:
v-zhangjc9
2024-04-26 10:16:11 +08:00
parent 1808c30786
commit 053a9222cd
5 changed files with 143 additions and 87 deletions

View File

@@ -1,12 +1,15 @@
package com.lanyuanxiaoyao.service.command.commands;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.command.utils.CommandLineUtils;
import com.lanyuanxiaoyao.service.command.utils.TableUtils;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnClusters;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.ScheduleService;
import com.lanyuanxiaoyao.service.forest.service.launcher.LaunchersService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.shell.standard.ShellComponent;
@@ -27,10 +30,14 @@ public class CompactionCommand extends AbstractUtilShellComponent {
private final InfoService infoService;
private final ScheduleService scheduleService;
private final YarnClusters yarnClusters;
private final LaunchersService launchersService;
public CompactionCommand(InfoService infoService, ScheduleService scheduleService) {
public CompactionCommand(InfoService infoService, ScheduleService scheduleService, YarnClusters yarnClusters, LaunchersService launchersService) {
this.infoService = infoService;
this.scheduleService = scheduleService;
this.yarnClusters = yarnClusters;
this.launchersService = launchersService;
}
@ShellMethod("启动表压缩任务")
@@ -59,4 +66,40 @@ public class CompactionCommand extends AbstractUtilShellComponent {
}
);
}
@ShellMethod("启动表压缩任务")
public String compactionDryRun(
@ShellOption(
help = "集群",
defaultValue = ""
) String cluster,
@ShellOption(help = "Flink job id") Long flinkJobId,
@ShellOption(help = "别名") String alias,
@ShellOption(
help = "Ignore double check",
defaultValue = "false"
) Boolean ignoreCheck
) {
if (StrUtil.isBlank(cluster)) {
cluster = yarnClusters.getDefaultCompactionCluster();
logger.info("Use default compaction cluster: {}", cluster);
}
String targetCluster = cluster;
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
return CommandLineUtils.generateResultLines(
() -> {
if (ObjectUtil.isEmpty(meta)) {
return "没有找到指定的表信息";
} else {
System.out.println(TableUtils.makeTableMeta(meta));
if (doubleCheck(RUN_CONFIRMATION_MESSAGE, ignoreCheck)) {
launchersService.compactionStart(targetCluster, flinkJobId, alias);
return Constants.OPERATION_DONE;
} else {
return Constants.OPERATION_CANCEL;
}
}
}
);
}
}

View File

@@ -14,6 +14,9 @@ public interface LauncherService {
@Get("/launcher/synchronizer/stop")
void syncStop(@Query("flink_job_id") Long flinkJobId);
@Get("/launcher/compaction/start")
void compactionStart(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias);
@Get("/launcher/compaction/stop")
void compactionStop(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias);

View File

@@ -46,6 +46,11 @@ public class LaunchersService {
return serviceMap.valuesView().toList().toImmutable();
}
public void compactionStart(String cluster, Long flinkJobId, String alias) {
LauncherService service = getService(cluster);
service.compactionStart(flinkJobId, alias);
}
public void compactionStop(Long flinkJobId, String alias) {
for (LauncherService service : getServices()) {
service.compactionStop(flinkJobId, alias);

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.launcher.compaction.controller;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
@@ -8,6 +9,7 @@ import com.lanyuanxiaoyao.service.common.entity.SyncState;
import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.ZookeeperService;
import com.lanyuanxiaoyao.service.launcher.compaction.service.CompactionService;
import com.lanyuanxiaoyao.service.launcher.configuration.ClusterConfiguration;
import java.io.IOException;
import java.util.EnumSet;
@@ -41,13 +43,15 @@ public class CompactionController {
private final ClusterConfiguration clusterConfiguration;
private final InfoService infoService;
private final ZookeeperService zookeeperService;
private final CompactionService compactionService;
private final YarnClient yarnClient;
public CompactionController(DiscoveryClient discoveryClient, ClusterConfiguration clusterConfiguration, InfoService infoService, ZookeeperService zookeeperService) {
public CompactionController(DiscoveryClient discoveryClient, ClusterConfiguration clusterConfiguration, InfoService infoService, ZookeeperService zookeeperService, CompactionService compactionService) {
this.discoveryClient = discoveryClient;
this.clusterConfiguration = clusterConfiguration;
this.infoService = infoService;
this.zookeeperService = zookeeperService;
this.compactionService = compactionService;
yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration());
@@ -62,6 +66,15 @@ public class CompactionController {
}
}
@GetMapping("start")
public void start(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias
) throws Exception {
logger.info("Enter method: start[flinkJobId, alias]. " + "flinkJobId:" + flinkJobId + "," + "alias:" + alias);
compactionService.compact(IdUtil.nanoId(10), flinkJobId, alias);
}
@GetMapping("stop")
public void stop(
@RequestParam("flink_job_id") Long flinkJobId,

View File

@@ -3,7 +3,6 @@ package com.lanyuanxiaoyao.service.launcher.compaction.service;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
@@ -99,7 +98,7 @@ public class CompactionService {
java.util.concurrent.ExecutorService threadPool = Executors.newWorkStealingPool(5);
if (Boolean.FALSE.equals(QueueUtil.isEmpty(discoveryClient, clusterConfiguration.getCompactionQueueName()))) {
for (int index = 0; index < 5; index++) {
threadPool.submit(this::compact);
threadPool.submit(() -> compact());
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
@@ -124,75 +123,21 @@ public class CompactionService {
ScheduleJob job = item.getData();
LogHelper.setMdcFlinkJobAndAlias(job.getFlinkJobId(), job.getAlias());
logger.info("Receive job[{}]({}): {}", item.getTraceId(), item.getCreateTime(), item.getData());
// 构造任务相关的锁
String lockPath = NameHelper.compactionLauncherLockPath(job.getFlinkJobId(), job.getAlias());
InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath);
try {
if (lock.acquire(2, TimeUnit.SECONDS)) {
Stat stat = zookeeperClient.checkExists().forPath(NameHelper.compactionRunningLockPath(job.getFlinkJobId(), job.getAlias()));
if (ObjectUtil.isNotNull(stat)) {
logger.info("Job {} {} is running", job.getFlinkJobId(), job.getAlias());
// 运行中的任务放在持有容器中
holder.add(item);
// 进入下一轮,由于最外层有一个 finally所以直接 continue 也会尝试获取新的任务
continue;
}
FlinkJob flinkJob = infoService.flinkJobDetail(job.getFlinkJobId());
TableMeta meta = infoService.tableMetaDetail(job.getFlinkJobId(), job.getAlias());
if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) {
logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias());
clearHolder(holder);
continue;
}
logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias());
// 判断是否存在 Hudi 表,提前结束掉
if (!hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias())) {
logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias());
clearHolder(holder);
continue;
}
// 获取待压缩的时间点
ImmutableList<HudiInstant> selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias());
if (ObjectUtil.isEmpty(selectedInstants)) {
logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias());
clearHolder(holder);
continue;
}
logger.info("[{}] [{}] Selected Instants: {}", flinkJob.getId(), meta.getAlias(), selectedInstants.makeString(","));
// 计算待压缩的文件数
long count = predictCompactFileCount(meta, selectedInstants);
if (ObjectUtil.isNotNull(count)) {
// 根据待压缩的文件数计算并行度
long parallelism = predictParallelism(count);
logger.info("[{}] [{}] Predict compact files: {} {}", flinkJob.getId(), meta.getAlias(), count, parallelism);
meta.getHudi().setCompactionTasks((int) parallelism);
}
logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias());
String applicationId = Failsafe.with(RETRY_POLICY)
.get(() -> executorService.runCompaction(
job.getBatch(),
flinkJob,
meta,
hadoopConfiguration.getKerberosKeytabPath(),
hadoopConfiguration.getKerberosPrincipal(),
selectedInstants.collect(HudiInstant::getTimestamp).makeString(","),
clusterConfiguration.getCluster()
).toString());
Failsafe.with(RETRY_POLICY)
.run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId));
clearHolder(holder);
} else {
logger.warn("Un acquire lock for " + item.getId());
holder.add(item);
}
compact(job.getBatch(), job.getFlinkJobId(), job.getAlias());
clearHolder(holder);
} catch (JobCannotRunningException e) {
// 运行中的任务放在持有容器中
holder.add(item);
// 进入下一轮,由于最外层有一个 finally所以直接 continue 也会尝试获取新的任务
} catch (Exception e) {
logger.warn(StrUtil.format("[{}] [{}] Try lock something wrong ", job.getFlinkJobId(), job.getAlias()), e);
logger.warn(StrUtil.format("[{}] [{}] Try compaction wrong ", job.getFlinkJobId(), job.getAlias()), e);
String failCount = item.getMetadata(Constants.SCHEDULE_JOB_FAIL_COUNT);
if (StrUtil.isNotBlank(failCount)) {
int fail = Integer.parseInt(failCount);
if (fail > 5) {
logger.error("Job {} cause unaccepted error", item);
continue;
return;
} else {
item.getMetadata().put(Constants.SCHEDULE_JOB_FAIL_COUNT, String.valueOf(fail + 1));
}
@@ -200,15 +145,6 @@ public class CompactionService {
item.getMetadata().put(Constants.SCHEDULE_JOB_FAIL_COUNT, "1");
}
QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item);
} finally {
// 无论如何,尝试解锁
try {
if (lock.isAcquiredInThisProcess()) {
lock.release();
}
} catch (Exception e) {
logger.error("Release lock failure " + lockPath, e);
}
}
} else {
logger.warn("Schedule job is empty. [{}]({}): {}", item.getTraceId(), item.getCreateTime(), item);
@@ -222,6 +158,73 @@ public class CompactionService {
clearHolder(holder);
}
public void compact(String batch, Long flinkJobId, String alias) throws Exception {
// 构造任务相关的锁
String lockPath = NameHelper.compactionLauncherLockPath(flinkJobId, alias);
InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath);
try {
if (lock.acquire(2, TimeUnit.SECONDS)) {
Stat stat = zookeeperClient.checkExists().forPath(NameHelper.compactionRunningLockPath(flinkJobId, alias));
if (ObjectUtil.isNotNull(stat)) {
logger.info("Job {} {} is running", flinkJobId, alias);
throw new JobCannotRunningException();
}
FlinkJob flinkJob = infoService.flinkJobDetail(flinkJobId);
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) {
logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias());
return;
}
logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias());
// 判断是否存在 Hudi 表,提前结束掉
if (!hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias())) {
logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias());
return;
}
// 获取待压缩的时间点
ImmutableList<HudiInstant> selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias());
if (ObjectUtil.isEmpty(selectedInstants)) {
logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias());
return;
}
logger.info("[{}] [{}] Selected Instants: {}", flinkJob.getId(), meta.getAlias(), selectedInstants.makeString(","));
// 计算待压缩的文件数
long count = predictCompactFileCount(meta, selectedInstants);
if (ObjectUtil.isNotNull(count)) {
// 根据待压缩的文件数计算并行度
long parallelism = predictParallelism(count);
logger.info("[{}] [{}] Predict compact files: {} {}", flinkJob.getId(), meta.getAlias(), count, parallelism);
meta.getHudi().setCompactionTasks((int) parallelism);
}
logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias());
String applicationId = Failsafe.with(RETRY_POLICY)
.get(() -> executorService.runCompaction(
batch,
flinkJob,
meta,
hadoopConfiguration.getKerberosKeytabPath(),
hadoopConfiguration.getKerberosPrincipal(),
selectedInstants.collect(HudiInstant::getTimestamp).makeString(","),
clusterConfiguration.getCluster()
).toString());
Failsafe.with(RETRY_POLICY)
.run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId));
} else {
logger.warn("Un acquire lock for " + alias);
throw new JobCannotRunningException();
}
} finally {
// 无论如何,尝试解锁
try {
if (lock.isAcquiredInThisProcess()) {
lock.release();
}
} catch (Exception e) {
logger.error("Release lock failure " + lockPath, e);
}
}
}
private void clearHolder(MutableList<QueueItem<ScheduleJob>> holder) {
if (holder.isEmpty()) {
return;
@@ -259,16 +262,5 @@ public class CompactionService {
return Math.toIntExact(parallelism);
}
private QueueItem<ScheduleJob> deserialize(String body) {
if (StrUtil.isBlank(body)) {
return null;
}
try {
return mapper.readValue(body, new TypeReference<QueueItem<ScheduleJob>>() {
});
} catch (Throwable error) {
logger.error("Schedule job parse error", error);
return null;
}
}
private static final class JobCannotRunningException extends Exception {}
}