feat(scheduler): 迁移service-launcher项目

This commit is contained in:
2024-02-28 18:37:12 +08:00
parent f258bfe5b0
commit 474ee6173d
27 changed files with 1599 additions and 44 deletions

176
service-launcher/pom.xml Normal file
View File

@@ -0,0 +1,176 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>hudi-service</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>service-launcher</artifactId>
<dependencies>
<dependency>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>service-configuration</artifactId>
<version>1.0.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink${flink.major.version}-bundle</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>service-forest</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<!-- 不知道为什么,反正如果要用 logback 就要这么写 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
<dependency>
<groupId>com.eshore.odcp.hudi.connector</groupId>
<artifactId>executor</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
<version>1.19.4</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<version>1.19.4</version>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-apache-client4</artifactId>
<version>1.19.4</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<finalName>${artifactId}-${build-tag}-${version}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>copy-config-file</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<resources>
<resource>
<directory>${project.parent.basedir}/config/${build-tag}</directory>
<includes>
<include>*.xml</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>
com.lanyuanxiaoyao.service.launcher.LauncherRunnerApplication
</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>core-default.xml</exclude>
<exclude>hdfs-default.xml</exclude>
<exclude>yarn-default.xml</exclude>
<exclude>log4j-surefire*.properties</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,355 @@
package com.lanyuanxiaoyao.service.launcher;
import cn.hutool.core.lang.Tuple;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.Constants;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.eshore.odcp.hudi.connector.exception.CheckpointRootPathNotFoundException;
import com.eshore.odcp.hudi.connector.utils.NameHelper;
import com.eshore.odcp.hudi.connector.utils.executor.Runner;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration;
import com.lanyuanxiaoyao.service.launcher.configuration.HudiConfiguration;
import com.lanyuanxiaoyao.service.launcher.utils.HadoopUtil;
import com.lanyuanxiaoyao.service.launcher.utils.JacksonUtil;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import javax.annotation.Resource;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.configuration.*;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hudi.common.util.collection.Pair;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import static com.eshore.odcp.hudi.connector.Constants.HALF_HOUR;
import static com.eshore.odcp.hudi.connector.Constants.MINUTE;
/**
* 执行器
*
* @author ZhangJiacheng
* @date 2022-06-06
*/
@Service
public class ExecutorService {
private static final Logger logger = LoggerFactory.getLogger(ExecutorService.class);
private static final ObjectMapper MAPPER = JacksonUtil.getMapper();
private static final Pattern EXECUTOR_JAR_NAME = Pattern.compile(".+sync-(\\d+)\\.jar");
@Resource
private InfoService infoService;
@Resource
private HadoopConfiguration hadoopConfiguration;
@Resource
private HudiConfiguration hudiConfiguration;
public static String[] syncArgs(FlinkJob flinkJob, ImmutableList<TableMeta> tableMetaList, String cluster) throws JsonProcessingException {
List<String> argsList = Lists.mutable.empty();
argsList.add(Constants.FLINK_JOB_OPTION);
argsList.add(MAPPER.writeValueAsString(flinkJob));
argsList.add(Constants.TABLE_META_LIST_OPTION);
argsList.add(MAPPER.writeValueAsString(tableMetaList));
argsList.add(Constants.CLUSTER_OPTION);
argsList.add(cluster);
return argsList.toArray(new String[]{});
}
public static String[] compactionArgs(FlinkJob flinkJob, TableMeta tableMeta, String instants, String cluster) throws JsonProcessingException {
List<String> argsList = Lists.mutable.empty();
argsList.add(Constants.FLINK_JOB_OPTION);
argsList.add(MAPPER.writeValueAsString(flinkJob));
argsList.add(Constants.TABLE_META_OPTION);
argsList.add(MAPPER.writeValueAsString(tableMeta));
if (ObjectUtil.isNotEmpty(instants)) {
argsList.add(Constants.INSTANTS_OPTION);
argsList.add(instants);
}
argsList.add(Constants.CLUSTER_OPTION);
argsList.add(cluster);
return argsList.toArray(new String[]{});
}
private Memory jobMemory(FlinkJob flinkJob, ImmutableList<TableMeta> tableMetas) {
int jobMemory = 1024;
int jobMetaspaceMemory = 256;
int taskMemory = 512;
int taskMetaspaceMemory = 256;
switch (flinkJob.getRunMode()) {
case ALL_IN_ONE:
jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory);
jobMetaspaceMemory = tableMetas.size() * 25;
taskMemory = (int) tableMetas.sumOfInt(meta -> meta.getSyncYarn().getTaskManagerMemory());
break;
case ALL_IN_ONE_BY_SCHEMA:
jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory);
jobMetaspaceMemory = tableMetas.distinctBy(TableMeta::getSchema).size() * 25;
taskMemory = (int) tableMetas.sumByInt(TableMeta::getSchema, meta -> meta.getSyncYarn().getTaskManagerMemory()).max();
break;
case ALL_IN_ONE_BY_TABLE:
jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory);
jobMetaspaceMemory = tableMetas.distinctBy(TableMeta::getTable).size() * 25;
taskMemory = (int) tableMetas.sumByInt(TableMeta::getTable, meta -> meta.getSyncYarn().getTaskManagerMemory()).max();
break;
case ONE_IN_ONE:
// jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory);
jobMemory = flinkJob.getOneInOneSyncYarn().getJobManagerMemory();
jobMetaspaceMemory = tableMetas.size() * 25;
taskMemory = flinkJob.getOneInOneSyncYarn().getTaskManagerMemory();
break;
default:
}
taskMetaspaceMemory = Math.min(taskMemory / 3, tableMetas.size() * 25);
return new Memory(
Math.max(512, jobMemory),
Math.max(256, jobMetaspaceMemory),
Math.max(512, taskMemory),
Math.max(256, taskMetaspaceMemory)
);
}
private String getLatestExecutorJarPath() throws IOException {
try (FileSystem fileSystem = HadoopUtil.createFileSystem(HadoopUtil.createConfiguration(hadoopConfiguration))) {
Path root = new Path(hudiConfiguration.getAppHdfsPath());
return Lists.immutable.of(fileSystem.listStatus(root))
.select(FileStatus::isFile)
.collect(FileStatus::getPath)
.collect(Path::toString)
.select(path -> ReUtil.isMatch(EXECUTOR_JAR_NAME, path))
.collect(path -> new Tuple(path, getLatestExecutorJarVersion(path)))
.reject(tuple -> tuple.<Long>get(1) < 0)
.maxByOptional(tuple -> tuple.<Long>get(1))
.orElseThrow(() -> new RuntimeException("Latest jar path not found"))
.get(0);
}
}
private Long getLatestExecutorJarVersion(String path) {
return Optional.ofNullable(ReUtil.get(EXECUTOR_JAR_NAME, path, 1)).map(Long::valueOf).orElse(-1L);
}
private void setEnvironment(Configuration configuration, String key, String value) {
configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + key, value);
configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + key, value);
}
private Configuration commonConfiguration(String keytabPath, String principal) {
Configuration configuration = new Configuration();
configuration.setBoolean(JobManagerOptions.JVM_DIRECT_MEMORY_LIMIT_ENABLED, true);
// configuration.setString(JobManagerOptions.ARCHIVE_DIR, hudiConfiguration.getArchiveHdfsPath());
configuration.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
configuration.setString(AkkaOptions.TCP_TIMEOUT, "15 min");
configuration.setString(AkkaOptions.LOOKUP_TIMEOUT, "10 min");
configuration.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(30));
// Kerberos认证
configuration.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, true);
if (StrUtil.isNotBlank(keytabPath)) {
configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
}
if (StrUtil.isNotBlank(principal)) {
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, principal);
}
// configuration.setString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS, "0.0.0.0");
// configuration.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 8083);
configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, MINUTE);
configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, HALF_HOUR);
configuration.setString(AkkaOptions.ASK_TIMEOUT, "1 min");
configuration.setString(AkkaOptions.TCP_TIMEOUT, "2 min");
configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
configuration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "4");
// configuration.setString(YarnConfigOptions.STAGING_DIRECTORY, "hdfs://b2/apps/datalake/yarn");
configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
configuration.setString(RestOptions.BIND_PORT, "8084-9400");
configuration.setString(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
if (StrUtil.isNotBlank(hudiConfiguration.getVictoriaPushUrl())) {
configuration.setString("metrics.reporter.victoria.class", "com.eshore.odcp.hudi.connector.utils.executor.metrics.VictoriaMetricsReporter");
configuration.setString("metrics.reporter.victoria.endpoint", hudiConfiguration.getVictoriaPushUrl());
configuration.setBoolean("metrics.reporter.victoria.enable.auth", true);
configuration.setString("metrics.reporter.victoria.auth.username", "EsCFVuNkiDWv7PKmcF");
configuration.setString("metrics.reporter.victoria.auth.password", "Abf%x9ocS^iKr3tgrd");
}
// 设置 JVM 参数
// configuration.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, "-XX:CMSInitiatingOccupancyFraction=40");
// configuration.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, "-XX:+UseG1GC -XX:MaxGCPauseMillis=1500 -XX:InitiatingHeapOccupancyPercent=30 -XX:SurvivorRatio=4 -XX:ParallelGCThreads=60");
return configuration;
}
public ApplicationId runSync(Long flinkJobId, String keytabPath, String principal, String cluster) throws Exception {
FlinkJob flinkJob = infoService.flinkJobDetail(flinkJobId);
ImmutableList<TableMeta> tableMetas = infoService.tableMetaList(flinkJobId);
if (ObjectUtil.isEmpty(tableMetas)) {
throw new RuntimeException("Table metas not found");
}
return runSync(flinkJob, tableMetas, keytabPath, principal, cluster);
}
public ApplicationId runSync(FlinkJob flinkJob, ImmutableList<TableMeta> tableMetaList, String keytabPath, String principal, String cluster) throws Exception {
logger.info("Running Sync: {} {} {}", flinkJob.getId(), keytabPath, principal);
try (FileSystem fileSystem = HadoopUtil.createFileSystem(HadoopUtil.createConfiguration(hadoopConfiguration))) {
String checkpointRootPath = tableMetaList.collect(meta -> meta.getConfig().getCheckpointRootPath())
.distinct()
.getFirstOptional()
.orElseThrow(CheckpointRootPathNotFoundException::new);
String checkpointPath = checkpointRootPath + "/" + flinkJob.getId();
logger.info("Delete checkpoint path {}", checkpointPath);
fileSystem.delete(new Path(checkpointPath), true);
}
Configuration configuration = commonConfiguration(keytabPath, principal);
Memory memory = jobMemory(flinkJob, tableMetaList);
// JobManager的内存
MemorySize jobMemorySize = MemorySize.parse(memory.jobMemory + "m");
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, jobMemorySize);
configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.parse(memory.jobMetaspaceMemory + "m"));
// TaskManager的内存
MemorySize taskMemorySize = MemorySize.parse(memory.taskMemory + "m");
configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, taskMemorySize);
configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("0m"));
configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.parse(memory.taskMetaspaceMemory + "m"));
// 并行度
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10);
configuration.setString(YarnConfigOptions.APPLICATION_NAME, NameHelper.syncJobName(flinkJob.getId(), flinkJob.getName()));
// configuration.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, "hdfs://b2/apps/flink/completed-jobs/");
// configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000);
// 业务jar包
String executorJarPath = getLatestExecutorJarPath();
logger.info("Executor jar path: {}", executorJarPath);
Long executorJarVersion = getLatestExecutorJarVersion(executorJarPath);
configuration.set(PipelineOptions.JARS, new ArrayList<String>() {{
add(executorJarPath);
}});
String flinkJobName = flinkJob.getName().replaceAll("\\s", "_");
setEnvironment(configuration, Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC);
setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_ID, String.valueOf(flinkJob.getId()));
setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJobName);
setEnvironment(configuration, Constants.METRICS_LABEL_EXECUTOR_VERSION, String.valueOf(executorJarVersion));
setEnvironment(configuration, Constants.METRICS_LABEL_CLUSTER, cluster);
setEnvironment(configuration, Constants.LOKI_PUSH_URL, hudiConfiguration.getLokiPushUrl());
configuration.setString("metrics.reporter.victoria.tags", Lists.immutable.of(
Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJobName),
Pair.of(Constants.METRICS_LABEL_EXECUTOR_VERSION, executorJarVersion)
).collect(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).makeString(";"));
return Runner.run(
configuration,
"com.eshore.odcp.hudi.connector.sync.Synchronizer",
syncArgs(flinkJob, tableMetaList, cluster)
);
}
public ApplicationId runCompaction(String batchCode, FlinkJob flinkJob, TableMeta tableMeta, String keytabPath, String principal, String instants, String cluster) throws Exception {
logger.info("Running Compaction: {} {} {} {} {}", batchCode, flinkJob.getId(), tableMeta.getAlias(), keytabPath, principal);
Configuration configuration = commonConfiguration(keytabPath, principal);
MemorySize jobMemorySize = MemorySize.parse(tableMeta.getCompactionYarn().getJobManagerMemory() + "m");
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, jobMemorySize);
MemorySize taskMemorySize = MemorySize.parse(tableMeta.getCompactionYarn().getTaskManagerMemory() + "m");
configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, taskMemorySize);
configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("0m"));
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10);
configuration.setString(YarnConfigOptions.APPLICATION_NAME, NameHelper.compactionJobName(flinkJob.getId(), tableMeta.getAlias()));
String executorJarPath = getLatestExecutorJarPath();
logger.info("Executor jar path: {}", executorJarPath);
Long executorJarVersion = getLatestExecutorJarVersion(executorJarPath);
configuration.set(PipelineOptions.JARS, new ArrayList<String>() {{
add(executorJarPath);
}});
String flinkJobName = flinkJob.getName().replaceAll("\\s", "_");
setEnvironment(configuration, Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION);
setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_ID, String.valueOf(flinkJob.getId()));
setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJobName);
setEnvironment(configuration, Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema());
setEnvironment(configuration, Constants.METRICS_LABEL_TABLE, tableMeta.getTable());
setEnvironment(configuration, Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias());
setEnvironment(configuration, Constants.METRICS_LABEL_BATCH_ID, batchCode);
setEnvironment(configuration, Constants.METRICS_LABEL_EXECUTOR_VERSION, String.valueOf(executorJarVersion));
setEnvironment(configuration, Constants.METRICS_LABEL_CLUSTER, cluster);
setEnvironment(configuration, Constants.LOKI_PUSH_URL, hudiConfiguration.getLokiPushUrl());
configuration.setString("metrics.reporter.victoria.tags", Lists.immutable.of(
Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")),
Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()),
Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()),
Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias()),
Pair.of(Constants.METRICS_LABEL_BATCH_ID, batchCode),
Pair.of(Constants.METRICS_LABEL_EXECUTOR_VERSION, executorJarVersion)
).collect(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).makeString(";"));
return Runner.run(
configuration,
"com.eshore.odcp.hudi.connector.sync.Compactor",
compactionArgs(flinkJob, tableMeta, instants, cluster)
);
}
private static final class Memory {
private final int jobMemory;
private final int jobMetaspaceMemory;
private final int taskMemory;
private final int taskMetaspaceMemory;
public Memory(int jobMemory, int jobMetaspaceMemory, int taskMemory, int taskMetaspaceMemory) {
this.jobMemory = jobMemory;
this.jobMetaspaceMemory = jobMetaspaceMemory;
this.taskMemory = taskMemory;
this.taskMetaspaceMemory = taskMetaspaceMemory;
}
}
}

View File

@@ -0,0 +1,35 @@
package com.lanyuanxiaoyao.service.launcher;
import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScans;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 启动类
*
* @author ZhangJiacheng
* @date 2023-04-05
*/
@EnableDiscoveryClient
@SpringBootApplication(exclude = {GsonAutoConfiguration.class})
@ComponentScans({
@ComponentScan("com.lanyuanxiaoyao.service"),
})
@EnableScheduling
@EnableConfigurationProperties
@EnableEncryptableProperties
public class LauncherRunnerApplication {
private static final Logger logger = LoggerFactory.getLogger(LauncherRunnerApplication.class);
public static void main(String[] args) {
SpringApplication.run(LauncherRunnerApplication.class, args);
}
}

View File

@@ -0,0 +1,110 @@
package com.lanyuanxiaoyao.service.launcher.compaction.controller;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.Constants;
import com.eshore.odcp.hudi.connector.entity.RunMeta;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.ZookeeperService;
import com.lanyuanxiaoyao.service.launcher.configuration.ClusterConfiguration;
import java.io.IOException;
import java.util.EnumSet;
import javax.annotation.PreDestroy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 提供和集群绑定的一些操作
*
* @author ZhangJiacheng
* @date 2023-05-30
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@RestController
@RequestMapping("launcher/compaction")
public class CompactionController {
private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
private final DiscoveryClient discoveryClient;
private final ClusterConfiguration clusterConfiguration;
private final InfoService infoService;
private final ZookeeperService zookeeperService;
private final YarnClient yarnClient;
public CompactionController(DiscoveryClient discoveryClient, ClusterConfiguration clusterConfiguration, InfoService infoService, ZookeeperService zookeeperService) {
this.discoveryClient = discoveryClient;
this.clusterConfiguration = clusterConfiguration;
this.infoService = infoService;
this.zookeeperService = zookeeperService;
yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration());
yarnClient.start();
}
@PreDestroy
public void destroy() throws IOException {
if (ObjectUtil.isNotNull(yarnClient)) {
yarnClient.stop();
yarnClient.close();
}
}
@GetMapping("stop")
public void stop(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam(value = "disable_meta", defaultValue = "true") Boolean disableMeta
) {
logger.info("Enter method: stop[flinkJobId, alias]. " + "flinkJobId:" + flinkJobId + "," + "alias:" + alias);
if (disableMeta) {
logger.info("Delete table info");
infoService.disableTable(flinkJobId, alias);
}
QueueUtil.remove(discoveryClient, clusterConfiguration.getCompactionQueueName(), StrUtil.format("{}-{}", flinkJobId, alias));
// 再删除一次,避免时间差导致任务重新放回预调度队列
QueueUtil.remove(discoveryClient, Constants.COMPACTION_QUEUE_PRE, StrUtil.format("{}-{}", flinkJobId, alias));
logger.info("Remove job form queue");
try {
RunMeta meta = zookeeperService.getSyncRunMeta(flinkJobId, alias);
if (ObjectUtil.isNotNull(meta) && ObjectUtil.isNotEmpty(meta.getApplicationId())) {
String applicationId = meta.getApplicationId();
stopApp(applicationId);
logger.info("Found application id and kill: {}", applicationId);
} else {
SyncState syncState = infoService.syncStateDetail(flinkJobId, alias);
if (ObjectUtil.isNotEmpty(syncState.getCompactionApplicationId())) {
String applicationId = syncState.getCompactionApplicationId();
stopApp(applicationId);
logger.info("Found application id and kill: {}", applicationId);
}
}
} catch (Throwable throwable) {
logger.warn("Application id not found, it may stop yet");
}
}
@GetMapping("stop_app")
public void stopApp(@RequestParam("application_id") String applicationId) throws IOException, YarnException {
boolean exists = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING))
.stream()
.anyMatch(r -> StrUtil.equals(r.getApplicationId().toString(), applicationId));
if (exists) {
yarnClient.killApplication(ApplicationId.fromString(applicationId));
} else {
logger.info("Application: {} not running", applicationId);
}
}
}

View File

@@ -0,0 +1,274 @@
package com.lanyuanxiaoyao.service.launcher.compaction.service;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.Constants;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.eshore.odcp.hudi.connector.entity.compaction.ScheduleJob;
import com.eshore.odcp.hudi.connector.utils.LogHelper;
import com.eshore.odcp.hudi.connector.utils.NameHelper;
import com.eshore.odcp.hudi.connector.utils.TableMetaHelper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.launcher.ExecutorService;
import com.lanyuanxiaoyao.service.launcher.configuration.ClusterConfiguration;
import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration;
import com.lanyuanxiaoyao.service.launcher.configuration.ZookeeperConfiguration;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* @author ZhangJiacheng
* @date 2023-05-08
*/
@Service
public class CompactionService {
private static final Logger logger = LoggerFactory.getLogger(CompactionService.class);
private static final java.util.concurrent.ExecutorService EXECUTOR = Executors.newWorkStealingPool(20);
private static final RetryPolicy<String> RETRY_POLICY = RetryPolicy.<String>builder()
.handle(Exception.class)
.withDelay(Duration.ofSeconds(1))
.withMaxAttempts(3)
.build();
private static final int FILE_PER_PARALLELISM = 2;
private static final int MAX_PARALLELISM = 500;
private static final int MIN_PARALLELISM = 10;
private final HadoopConfiguration hadoopConfiguration;
private final ClusterConfiguration clusterConfiguration;
private final DiscoveryClient discoveryClient;
private final CuratorFramework zookeeperClient;
private final InfoService infoService;
private final HudiService hudiService;
private final ExecutorService executorService;
private final ObjectMapper mapper;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public CompactionService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ZookeeperConfiguration zookeeperConfiguration, DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ExecutorService executorService, Jackson2ObjectMapperBuilder builder) {
this.hadoopConfiguration = hadoopConfiguration;
this.clusterConfiguration = clusterConfiguration;
this.discoveryClient = discoveryClient;
this.infoService = infoService;
this.hudiService = hudiService;
this.executorService = executorService;
mapper = builder.build();
this.zookeeperClient = CuratorFrameworkFactory.builder()
.connectString(zookeeperConfiguration.getConnectUrl())
.retryPolicy(new ExponentialBackoffRetry((int) Constants.SECOND, 3))
.sessionTimeoutMs((int) Constants.SECOND)
.connectionTimeoutMs((int) (30 * Constants.SECOND))
.build();
this.zookeeperClient.start();
}
@PreDestroy
public void destroy() {
if (ObjectUtil.isNotNull(this.zookeeperClient)) {
this.zookeeperClient.close();
}
}
@Scheduled(fixedDelay = 5, initialDelay = 30, timeUnit = TimeUnit.SECONDS)
public void scheduleCompact() {
java.util.concurrent.ExecutorService threadPool = Executors.newWorkStealingPool(5);
if (Boolean.FALSE.equals(QueueUtil.isEmpty(discoveryClient, clusterConfiguration.getCompactionQueueName()))) {
for (int index = 0; index < 5; index++) {
threadPool.submit(this::compact);
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
ThreadUtil.safeSleep(10 * Constants.SECOND);
}
logger.info("Finish compaction schedule");
}
}
@SuppressWarnings("DataFlowIssue")
private void compact() {
// 每次把由于各种原因无法运行的任务先放在暂存 list 里面,直到遇到一个能执行的任务,就把这批任务送回队列
MutableList<QueueItem<ScheduleJob>> holder = Lists.mutable.empty();
QueueItem<ScheduleJob> item = QueueUtil.poll(discoveryClient, this.mapper, clusterConfiguration.getCompactionQueueName());
// 检查是否获取到新的任务,如果队列为空,获取到的任务也是空
while (ObjectUtil.isNotNull(item)) {
LogHelper.setMdc(Constants.LOG_JOB_ID_LABEL, item.getTraceId());
try {
// 检查任务内容是否存在
if (ObjectUtil.isNotNull(item.getData())) {
ScheduleJob job = item.getData();
LogHelper.setMdcFlinkJobAndAlias(job.getFlinkJobId(), job.getAlias());
logger.info("Receive job[{}]({}): {}", item.getTraceId(), item.getCreateTime(), item.getData());
// 构造任务相关的锁
String lockPath = NameHelper.compactionLauncherLockPath(job.getFlinkJobId(), job.getAlias());
InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath);
try {
if (lock.acquire(2, TimeUnit.SECONDS)) {
Stat stat = zookeeperClient.checkExists().forPath(NameHelper.compactionRunningLockPath(job.getFlinkJobId(), job.getAlias()));
if (ObjectUtil.isNotNull(stat)) {
logger.info("Job {} {} is running", job.getFlinkJobId(), job.getAlias());
// 运行中的任务放在持有容器中
holder.add(item);
// 进入下一轮,由于最外层有一个 finally所以直接 continue 也会尝试获取新的任务
continue;
}
FlinkJob flinkJob = infoService.flinkJobDetail(job.getFlinkJobId());
TableMeta meta = infoService.tableMetaDetail(job.getFlinkJobId(), job.getAlias());
if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) {
logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias());
clearHolder(holder);
continue;
}
logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias());
// 判断是否存在 Hudi 表,提前结束掉
if (!hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias())) {
logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias());
clearHolder(holder);
continue;
}
// 获取待压缩的时间点
ImmutableList<HudiInstant> selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias());
if (ObjectUtil.isEmpty(selectedInstants)) {
logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias());
clearHolder(holder);
continue;
}
logger.info("[{}] [{}] Selected Instants: {}", flinkJob.getId(), meta.getAlias(), selectedInstants.makeString(","));
// 计算待压缩的文件数
long count = predictCompactFileCount(meta, selectedInstants);
if (ObjectUtil.isNotNull(count)) {
// 根据待压缩的文件数计算并行度
long parallelism = predictParallelism(count);
logger.info("[{}] [{}] Predict compact files: {} {}", flinkJob.getId(), meta.getAlias(), count, parallelism);
meta.getHudi().setCompactionTasks((int) parallelism);
}
logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias());
String applicationId = Failsafe.with(RETRY_POLICY)
.get(() -> executorService.runCompaction(
job.getBatch(),
flinkJob,
meta,
hadoopConfiguration.getKerberosKeytabPath(),
hadoopConfiguration.getKerberosPrincipal(),
selectedInstants.collect(HudiInstant::getTimestamp).makeString(","),
clusterConfiguration.getCluster()
).toString());
Failsafe.with(RETRY_POLICY)
.run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId));
clearHolder(holder);
} else {
logger.warn("Un acquire lock for " + item.getId());
holder.add(item);
}
} catch (Exception e) {
logger.warn(StrUtil.format("[{}] [{}] Try lock something wrong ", job.getFlinkJobId(), job.getAlias()), e);
String failCount = item.getMetadata(Constants.SCHEDULE_JOB_FAIL_COUNT);
if (StrUtil.isNotBlank(failCount)) {
int fail = Integer.parseInt(failCount);
if (fail > 5) {
logger.error("Job {} cause unaccepted error", item);
continue;
} else {
item.getMetadata().put(Constants.SCHEDULE_JOB_FAIL_COUNT, String.valueOf(fail + 1));
}
} else {
item.getMetadata().put(Constants.SCHEDULE_JOB_FAIL_COUNT, "1");
}
QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item);
} finally {
// 无论如何,尝试解锁
try {
if (lock.isAcquiredInThisProcess()) {
lock.release();
}
} catch (Exception e) {
logger.error("Release lock failure " + lockPath, e);
}
}
} else {
logger.warn("Schedule job is empty. [{}]({}): {}", item.getTraceId(), item.getCreateTime(), item);
}
} finally {
// 无论如何尝试获取下个任务
item = QueueUtil.poll(discoveryClient, this.mapper, clusterConfiguration.getCompactionQueueName());
LogHelper.removeMdc(Constants.LOG_JOB_ID_LABEL, Constants.LOG_FLINK_JOB_ID_LABEL, Constants.LOG_ALIAS_LABEL);
}
}
clearHolder(holder);
}
private void clearHolder(MutableList<QueueItem<ScheduleJob>> holder) {
if (holder.isEmpty()) {
return;
}
// 等待半分钟
ThreadUtil.safeSleep(Constants.HALF_MINUTE);
for (QueueItem<ScheduleJob> item : holder) {
QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item);
}
holder.clear();
}
private Long predictCompactFileCount(TableMeta meta, ImmutableList<HudiInstant> selectedInstants) {
long count = selectedInstants.asParallel(EXECUTOR, 1)
.sumOfLong(instant -> {
try {
HudiCompactionPlan plan = hudiService.readCompactionPlan(meta.getJob().getId(), meta.getAlias(), instant.getTimestamp());
return plan.getOperations().size();
} catch (Exception e) {
logger.error("Read compaction plan failure", e);
}
return 0;
});
return count < 1 ? meta.getHudi().getBucketIndexNumber() : count;
}
private Integer predictParallelism(Long count) {
long parallelism = Math.round(count * 1.0 / FILE_PER_PARALLELISM);
if (parallelism > MAX_PARALLELISM) {
parallelism = MAX_PARALLELISM;
}
if (parallelism < MIN_PARALLELISM) {
parallelism = MIN_PARALLELISM;
}
return Math.toIntExact(parallelism);
}
private QueueItem<ScheduleJob> deserialize(String body) {
if (StrUtil.isBlank(body)) {
return null;
}
try {
return mapper.readValue(body, new TypeReference<QueueItem<ScheduleJob>>() {
});
} catch (Throwable error) {
logger.error("Schedule job parse error", error);
return null;
}
}
}

View File

@@ -0,0 +1,61 @@
package com.lanyuanxiaoyao.service.launcher.configuration;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 集群相关配置
*
* @author ZhangJiacheng
* @date 2023-05-09
*/
@ConfigurationProperties("connector.cluster")
@Component
public class ClusterConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ClusterConfiguration.class);
private String cluster;
private String syncQueueName;
private String compactionQueueName;
@PostConstruct
private void init() {
logger.info("Configuration initial: {}", this);
}
public String getCluster() {
return cluster;
}
public void setCluster(String cluster) {
this.cluster = cluster;
}
public String getSyncQueueName() {
return syncQueueName;
}
public void setSyncQueueName(String syncQueueName) {
this.syncQueueName = syncQueueName;
}
public String getCompactionQueueName() {
return compactionQueueName;
}
public void setCompactionQueueName(String compactionQueueName) {
this.compactionQueueName = compactionQueueName;
}
@Override
public String toString() {
return "ClusterConfiguration{" +
"cluster='" + cluster + '\'' +
", syncQueueName='" + syncQueueName + '\'' +
", compactionQueueName='" + compactionQueueName + '\'' +
'}';
}
}

View File

@@ -0,0 +1,51 @@
package com.lanyuanxiaoyao.service.launcher.configuration;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* yarn 配置
*
* @author ZhangJiacheng
* @date 2022-03-30
*/
@ConfigurationProperties("connector.hadoop")
@Component
public class HadoopConfiguration {
private static final Logger logger = LoggerFactory.getLogger(HadoopConfiguration.class);
private String kerberosPrincipal;
private String kerberosKeytabPath;
@PostConstruct
private void init() {
logger.info("Configuration initial: {}", this);
}
public String getKerberosPrincipal() {
return kerberosPrincipal;
}
public void setKerberosPrincipal(String kerberosPrincipal) {
this.kerberosPrincipal = kerberosPrincipal;
}
public String getKerberosKeytabPath() {
return kerberosKeytabPath;
}
public void setKerberosKeytabPath(String kerberosKeytabPath) {
this.kerberosKeytabPath = kerberosKeytabPath;
}
@Override
public String toString() {
return "HadoopConfiguration{" +
"kerberosPrincipal='" + kerberosPrincipal + '\'' +
", kerberosKeytabPath='" + kerberosKeytabPath + '\'' +
'}';
}
}

View File

@@ -0,0 +1,61 @@
package com.lanyuanxiaoyao.service.launcher.configuration;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* Hudi 配置
*
* @author ZhangJiacheng
* @date 2023-02-03
*/
@ConfigurationProperties("connector.hudi")
@Component
public class HudiConfiguration {
private static final Logger logger = LoggerFactory.getLogger(HudiConfiguration.class);
private String appHdfsPath;
private String victoriaPushUrl;
private String lokiPushUrl;
@PostConstruct
private void init() {
logger.info("Configuration initial: {}", this);
}
public String getAppHdfsPath() {
return appHdfsPath;
}
public void setAppHdfsPath(String appHdfsPath) {
this.appHdfsPath = appHdfsPath;
}
public String getVictoriaPushUrl() {
return victoriaPushUrl;
}
public void setVictoriaPushUrl(String victoriaPushUrl) {
this.victoriaPushUrl = victoriaPushUrl;
}
public String getLokiPushUrl() {
return lokiPushUrl;
}
public void setLokiPushUrl(String lokiPushUrl) {
this.lokiPushUrl = lokiPushUrl;
}
@Override
public String toString() {
return "HudiConfiguration{" +
"appHdfsPath='" + appHdfsPath + '\'' +
", victoriaPushUrl='" + victoriaPushUrl + '\'' +
", lokiPushUrl='" + lokiPushUrl + '\'' +
'}';
}
}

View File

@@ -0,0 +1,41 @@
package com.lanyuanxiaoyao.service.launcher.configuration;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* Zookeeper 配置
*
* @author ZhangJiacheng
* @date 2023-05-04
*/
@ConfigurationProperties("connector.zookeeper")
@Component
public class ZookeeperConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfiguration.class);
private String connectUrl;
@PostConstruct
private void init() {
logger.info("Configuration initial: {}", this);
}
public String getConnectUrl() {
return connectUrl;
}
public void setConnectUrl(String connectUrl) {
this.connectUrl = connectUrl;
}
@Override
public String toString() {
return "ZookeeperConfiguration{" +
"connectUrl='" + connectUrl + '\'' +
'}';
}
}

View File

@@ -0,0 +1,40 @@
package com.lanyuanxiaoyao.service.launcher.synchronizer.controller;
import com.lanyuanxiaoyao.service.launcher.synchronizer.service.SynchronizerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 同步控制
*
* @author ZhangJiacheng
* @date 2023-12-07
*/
@RestController
@RequestMapping("launcher/synchronizer")
public class SynchronizerController {
private static final Logger logger = LoggerFactory.getLogger(SynchronizerController.class);
private final SynchronizerService synchronizerService;
public SynchronizerController(@Qualifier("sync-service") SynchronizerService synchronizerService) {
this.synchronizerService = synchronizerService;
}
@GetMapping("start")
public String start(@RequestParam("flink_job_id") Long flinkJobId) throws Exception {
logger.info("Try to start job: {}", flinkJobId);
return synchronizerService.start(flinkJobId);
}
@GetMapping("stop")
public void stop(@RequestParam("flink_job_id") Long flinkJobId) throws Exception {
logger.info("Try to start job: {}", flinkJobId);
synchronizerService.stop(flinkJobId);
}
}

View File

@@ -0,0 +1,137 @@
package com.lanyuanxiaoyao.service.launcher.synchronizer.service;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.Constants;
import com.eshore.odcp.hudi.connector.entity.RunMeta;
import com.eshore.odcp.hudi.connector.utils.NameHelper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.launcher.ExecutorService;
import com.lanyuanxiaoyao.service.launcher.configuration.ClusterConfiguration;
import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration;
import com.lanyuanxiaoyao.service.launcher.configuration.ZookeeperConfiguration;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.stereotype.Service;
/**
* Sync 服务
*
* @author ZhangJiacheng
* @date 2023-12-07
*/
@Service("sync-service")
public class SynchronizerService {
private static final Logger logger = LoggerFactory.getLogger(SynchronizerService.class);
private static final RetryPolicy<ApplicationId> RETRY_POLICY = RetryPolicy.<ApplicationId>builder()
.handle(Exception.class)
.withDelay(Duration.ofSeconds(1))
.withMaxAttempts(3)
.build();
private final HadoopConfiguration hadoopConfiguration;
private final ClusterConfiguration clusterConfiguration;
private final CuratorFramework zookeeperClient;
private final ExecutorService executorService;
private final YarnClient yarnClient;
private final ObjectMapper mapper;
public SynchronizerService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ExecutorService executorService, ZookeeperConfiguration zookeeperConfiguration, Jackson2ObjectMapperBuilder builder) {
this.hadoopConfiguration = hadoopConfiguration;
this.clusterConfiguration = clusterConfiguration;
this.executorService = executorService;
mapper = builder.build();
yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration());
yarnClient.start();
zookeeperClient = CuratorFrameworkFactory.builder()
.connectString(zookeeperConfiguration.getConnectUrl())
.retryPolicy(new ExponentialBackoffRetry((int) Constants.SECOND, 3))
.sessionTimeoutMs((int) Constants.SECOND)
.connectionTimeoutMs((int) (30 * Constants.SECOND))
.build();
zookeeperClient.start();
}
@PreDestroy
public void destroy() throws IOException {
if (ObjectUtil.isNotNull(yarnClient)) {
yarnClient.stop();
yarnClient.close();
}
if (ObjectUtil.isNotNull(this.zookeeperClient)) {
this.zookeeperClient.close();
}
}
public String start(Long flinkJobId) throws Exception {
String lockPath = NameHelper.syncLauncherLockPath(flinkJobId);
InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath);
try {
if (lock.acquire(2, TimeUnit.SECONDS)) {
Stat stat = zookeeperClient.checkExists().forPath(NameHelper.syncRunningLockPath(flinkJobId, null));
if (ObjectUtil.isNotNull(stat)) {
String message = StrUtil.format("Job {} is running", flinkJobId);
logger.warn(message);
throw new RuntimeException(message);
}
ApplicationId applicationId = Failsafe
.with(RETRY_POLICY)
.get(() -> executorService.runSync(
flinkJobId,
hadoopConfiguration.getKerberosKeytabPath(),
hadoopConfiguration.getKerberosPrincipal(),
clusterConfiguration.getCluster()
));
return applicationId.toString();
} else {
String message = StrUtil.format("Un acquire lock for {}", flinkJobId);
logger.warn(message);
throw new RuntimeException(message);
}
} finally {
if (lock.isAcquiredInThisProcess()) {
lock.release();
}
}
}
public void stop(Long flinkJobId) throws Exception {
String syncLockPath = NameHelper.syncRunningLockPath(flinkJobId, null);
Stat stat = zookeeperClient.checkExists().forPath(syncLockPath);
if (ObjectUtil.isNull(stat)) {
return;
}
byte[] bytes = zookeeperClient.getData().forPath(syncLockPath);
RunMeta meta = mapper.readValue(new String(bytes), RunMeta.class);
logger.info("Run meta: {}", meta);
String applicationIdText = meta.getApplicationId();
if (StrUtil.isBlank(applicationIdText)) {
throw new RuntimeException(StrUtil.format("Cannot parse application id from zookeeper: {}", syncLockPath));
}
ApplicationId applicationId = ApplicationId.fromString(applicationIdText);
Failsafe.with(RETRY_POLICY)
.run(() -> yarnClient.killApplication(applicationId));
logger.info("{} have be killed", applicationId);
}
}

View File

@@ -0,0 +1,64 @@
package com.lanyuanxiaoyao.service.launcher.utils;
import cn.hutool.core.util.ObjectUtil;
import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hadoop 工具类
*
* @author ZhangJiacheng
* @date 2022-04-25
*/
public class HadoopUtil {
private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class);
public static Configuration createConfiguration() throws IOException {
return createConfiguration(null);
}
public static Configuration createConfiguration(HadoopConfiguration configuration) throws IOException {
Configuration hadoopConfiguration = new Configuration();
if (ObjectUtil.isNull(configuration)) {
return hadoopConfiguration;
}
UserGroupInformation.setConfiguration(hadoopConfiguration);
if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation.loginUserFromKeytab(configuration.getKerberosPrincipal(), configuration.getKerberosKeytabPath());
}
return hadoopConfiguration;
}
public static FileSystem createFileSystem() {
return createFileSystem(null);
}
public static FileSystem createFileSystem(Configuration configuration) {
try {
if (ObjectUtil.isNull(configuration)) {
return FileSystem.get(new Configuration());
}
return FileSystem.get(configuration);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static ImmutableList<FileStatus> list(FileSystem fileSystem, Path path) {
try {
return Lists.immutable.of(fileSystem.listStatus(path));
} catch (IOException e) {
logger.error("List error", e);
return Lists.immutable.empty();
}
}
}

View File

@@ -0,0 +1,30 @@
package com.lanyuanxiaoyao.service.launcher.utils;
import cn.hutool.core.util.ObjectUtil;
import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Json 解析相关工具
*
* @author ZhangJiacheng
* @date 2022-06-12
*/
public class JacksonUtil implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(JacksonUtil.class);
private static ObjectMapper INSTANCE = null;
public static ObjectMapper getMapper() {
if (ObjectUtil.isNull(INSTANCE)) {
INSTANCE = new ObjectMapper();
INSTANCE.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
return INSTANCE;
}
}

View File

@@ -0,0 +1,7 @@
spring:
application:
name: service-launcher
profiles:
include: random-port,common,discovery,metrics,forest
forest:
backend: httpclient

View File

@@ -0,0 +1,53 @@
<configuration>
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<springProperty scope="context" name="LOKI_PUSH_URL" source="loki.url"/>
<springProperty scope="context" name="LOGGING_PARENT" source="logging.parent"/>
<springProperty scope="context" name="APP_NAME" source="spring.application.name"/>
<springProperty scope="context" name="APP_CLUSTER" source="connector.cluster.cluster"/>
<appender name="Loki" class="com.github.loki4j.logback.Loki4jAppender">
<metricsEnabled>true</metricsEnabled>
<http class="com.github.loki4j.logback.ApacheHttpSender">
<url>${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push}</url>
</http>
<format>
<label>
<pattern>cluster=${APP_CLUSTER:-none},app=${APP_NAME:-none},host=${HOSTNAME:-none},level=%level,job_id=%mdc{LOG_JOB_ID_LABEL:-none},flink_job_id=%mdc{LOG_FLINK_JOB_ID_LABEL:-none},alias=%mdc{LOG_ALIAS_LABEL:-none}</pattern>
<readMarkers>true</readMarkers>
</label>
<message>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx</pattern>
</message>
<sortByTime>true</sortByTime>
</format>
</appender>
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %clr(%5p) %clr([${HOSTNAME}]){yellow} %clr([%t]){magenta} %clr(%logger{40}){cyan} #@# %m%n%wEx</pattern>
</encoder>
</appender>
<appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOGGING_PARENT:-.}/${APP_NAME:-run}.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz</fileNamePattern>
<MaxHistory>7</MaxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx</pattern>
</encoder>
</appender>
<logger name="com.zaxxer.hikari" level="ERROR"/>
<logger name="com.netflix.discovery.shared.resolver.aws.ConfigClusterResolver" level="WARN"/>
<root level="INFO">
<appender-ref ref="Loki"/>
<appender-ref ref="Console"/>
<appender-ref ref="RollingFile"/>
</root>
</configuration>

View File

@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.flink.yarn.YarnClusterClientFactory

View File

@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory

View File

@@ -0,0 +1 @@
com.eshore.odcp.hudi.connector.utils.executor.metrics.VictoriaMetricsReporterFactory

View File

@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.flink.table.planner.delegation.DefaultExecutorFactory
org.apache.flink.table.planner.delegation.DefaultParserFactory
org.apache.flink.table.planner.delegation.DefaultPlannerFactory