refactor(executor-manager): 使用sync同款jar更新方式

This commit is contained in:
2024-03-01 15:18:30 +08:00
parent 36c31df2c1
commit 7dc38383c5
6 changed files with 111 additions and 31 deletions

View File

@@ -1,9 +1,11 @@
package com.lanyuanxiaoyao.service.executor.manager.service;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.lang.Tuple;
import cn.hutool.core.map.MapBuilder;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.executor.Runner;
@@ -15,6 +17,7 @@ import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.client.cli.ClientOptions;
@@ -26,7 +29,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
@@ -48,6 +50,7 @@ import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
@Service
public class ExecutorTaskService {
private static final Logger logger = LoggerFactory.getLogger(ExecutorTaskService.class);
private static final Pattern EXECUTOR_JAR_NAME = Pattern.compile(".+task-(\\d+)\\.jar");
private final HadoopConfiguration hadoopConfiguration;
private final ExecutorConfiguration executorConfiguration;
@@ -68,7 +71,27 @@ public class ExecutorTaskService {
configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + key, value);
}
private Configuration generateConfiguration(String taskId, String name) {
private String getLatestExecutorJarPath() throws IOException {
try (FileSystem fileSystem = FileSystem.get(new org.apache.hadoop.conf.Configuration())) {
Path root = new Path(executorConfiguration.getTaskJarPath());
return Lists.immutable.of(fileSystem.listStatus(root))
.select(FileStatus::isFile)
.collect(FileStatus::getPath)
.collect(Path::toString)
.select(path -> ReUtil.isMatch(EXECUTOR_JAR_NAME, path))
.collect(path -> new Tuple(path, getLatestExecutorJarVersion(path)))
.reject(tuple -> tuple.<Long>get(1) < 0)
.maxByOptional(tuple -> tuple.<Long>get(1))
.orElseThrow(() -> new RuntimeException("Latest jar path not found"))
.get(0);
}
}
private Long getLatestExecutorJarVersion(String path) {
return Optional.ofNullable(ReUtil.get(EXECUTOR_JAR_NAME, path, 1)).map(Long::valueOf).orElse(-1L);
}
private Configuration generateConfiguration(String taskId, String name) throws IOException {
Configuration configuration = new Configuration();
configuration.setBoolean(JobManagerOptions.JVM_DIRECT_MEMORY_LIMIT_ENABLED, true);
configuration.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
@@ -100,8 +123,11 @@ public class ExecutorTaskService {
setEnvironment(configuration, "task_id", taskId);
// 业务jar包
String executorJarPath = getLatestExecutorJarPath();
logger.info("Executor jar path: {}", executorJarPath);
configuration.set(PipelineOptions.JARS, new ArrayList<String>() {{
add(executorConfiguration.getTaskJarPath());
add(executorJarPath);
}});
return configuration;
}