diff --git a/bin/build-launcher.sh b/bin/build-launcher.sh
new file mode 100755
index 0000000..0f7fd92
--- /dev/null
+++ b/bin/build-launcher.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+echo 'Build b2a4'
+mvn -pl service-launcher clean package -D skipTests -s ~/.m2/settings-development.xml -P b2a4
+ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2a4-1.0.0-SNAPSHOT.jar
+rm /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2a4-1.0.0-SNAPSHOT.jar
+
+echo 'Build b2b1'
+mvn -pl service-launcher clean package -D skipTests -s ~/.m2/settings-development.xml -P b2b1
+ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b1-1.0.0-SNAPSHOT.jar
+rm /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b1-1.0.0-SNAPSHOT.jar
+
+echo 'Build b2b5'
+mvn -pl service-launcher clean package -D skipTests -s ~/.m2/settings-development.xml -P b2b5
+ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b5-1.0.0-SNAPSHOT.jar
+rm /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b5-1.0.0-SNAPSHOT.jar
+
+echo 'Build b2b12'
+mvn -pl service-launcher clean package -D skipTests -s ~/.m2/settings-development.xml -P b2b12
+ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b12-1.0.0-SNAPSHOT.jar
+rm /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b12-1.0.0-SNAPSHOT.jar
diff --git a/pom.xml b/pom.xml
index 64a28c3..48e000f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,7 @@
service-check
service-api
service-scheduler
+ service-launcher
@@ -50,33 +51,21 @@
- b1e1
+ b2a4
- b1e1
+ b2a4
- b1e11
+ b2b1
- b1e11
+ b2b1
- b2e1
+ b2b5
- b2e1
-
-
-
- b5s119
-
- b5s119
-
-
-
- b2s119
-
- b2s119
+ b2b5
@@ -85,12 +74,6 @@
b2b12
-
- b9b9
-
- b9b9
-
-
@@ -181,6 +164,11 @@
maven-source-plugin
3.3.0
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.3.0
+
org.springframework.boot
spring-boot-maven-plugin
diff --git a/service-cli/service-cli-runner/src/main/resources/application-b12.yml b/service-cli/service-cli-runner/src/main/resources/application-b12.yml
index c0d4cca..9e35808 100644
--- a/service-cli/service-cli-runner/src/main/resources/application-b12.yml
+++ b/service-cli/service-cli-runner/src/main/resources/application-b12.yml
@@ -32,13 +32,13 @@ deploy:
services:
service-api:
replicas: 10
- service-launcher-runner-b1:
+ service-launcher-b1:
replicas: 6
- service-launcher-runner-b5:
+ service-launcher-b5:
replicas: 6
- service-launcher-runner-a4:
+ service-launcher-a4:
replicas: 6
- service-launcher-runner-b12:
+ service-launcher-b12:
replicas: 6
service-info-query:
replicas: 10
diff --git a/service-cli/service-cli-runner/src/main/resources/application.yml b/service-cli/service-cli-runner/src/main/resources/application.yml
index 92c7f66..2768a08 100644
--- a/service-cli/service-cli-runner/src/main/resources/application.yml
+++ b/service-cli/service-cli-runner/src/main/resources/application.yml
@@ -20,13 +20,13 @@ deploy:
- "service-hudi"
source-jar: service-scheduler-1.0.0-SNAPSHOT.jar
replicas: 1
- service-launcher-runner-b1:
+ service-launcher-b1:
order: 4
groups:
- "service"
- "service-hudi"
- "service-hudi-launcher"
- source-jar: service-launcher-runner-b2b1-1.0.0-SNAPSHOT.jar
+ source-jar: service-launcher-b2b1-1.0.0-SNAPSHOT.jar
replicas: 6
environments:
connector_hadoop_kerberos-principal: ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM
@@ -35,18 +35,18 @@ deploy:
connector_hudi_victoria-push-url: ${deploy.runtime.hudi.victoria-push-url}
connector_hudi_loki-push-url: ${deploy.runtime.hudi.loki-push-url}
arguments:
- spring_application_name: service-launcher-runner-b1
+ spring_application_name: service-launcher-b1
connector_cluster_cluster: b1
connector_cluster_sync-queue-name: sync-queue-b1
connector_cluster_compaction-queue-name: compaction-queue-b1
connector_zookeeper_connect-url: ${deploy.runtime.connector-zk-url}
- service-launcher-runner-b5:
+ service-launcher-b5:
order: 4
groups:
- "service"
- "service-hudi"
- "service-hudi-launcher"
- source-jar: service-launcher-runner-b2b5-1.0.0-SNAPSHOT.jar
+ source-jar: service-launcher-b2b5-1.0.0-SNAPSHOT.jar
replicas: 6
environments:
connector_hadoop_kerberos-principal: ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM
@@ -55,18 +55,18 @@ deploy:
connector_hudi_victoria-push-url: ${deploy.runtime.hudi.victoria-push-url}
connector_hudi_loki-push-url: ${deploy.runtime.hudi.loki-push-url}
arguments:
- spring_application_name: service-launcher-runner-b5
+ spring_application_name: service-launcher-b5
connector_cluster_cluster: b5
connector_cluster_sync-queue-name: sync-queue-b5
connector_cluster_compaction-queue-name: compaction-queue-b5
connector_zookeeper_connect-url: ${deploy.runtime.connector-zk-url}
- service-launcher-runner-a4:
+ service-launcher-a4:
order: 4
groups:
- "service"
- "service-hudi"
- "service-hudi-launcher"
- source-jar: service-launcher-runner-b2a4-1.0.0-SNAPSHOT.jar
+ source-jar: service-launcher-b2a4-1.0.0-SNAPSHOT.jar
replicas: 6
environments:
connector_hadoop_kerberos-principal: ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM
@@ -75,18 +75,18 @@ deploy:
connector_hudi_victoria-push-url: ${deploy.runtime.hudi.victoria-push-url}
connector_hudi_loki-push-url: ${deploy.runtime.hudi.loki-push-url}
arguments:
- spring_application_name: service-launcher-runner-a4
+ spring_application_name: service-launcher-a4
connector_cluster_cluster: a4
connector_cluster_sync-queue-name: sync-queue-a4
connector_cluster_compaction-queue-name: compaction-queue-a4
connector_zookeeper_connect-url: ${deploy.runtime.connector-zk-url}
- service-launcher-runner-b12:
+ service-launcher-b12:
order: 4
groups:
- "service"
- "service-hudi"
- "service-hudi-launcher"
- source-jar: service-launcher-runner-b2b12-1.0.0-SNAPSHOT.jar
+ source-jar: service-launcher-b2b12-1.0.0-SNAPSHOT.jar
replicas: 6
environments:
connector_hadoop_kerberos-principal: ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM
@@ -95,7 +95,7 @@ deploy:
connector_hudi_victoria-push-url: ${deploy.runtime.hudi.victoria-push-url}
connector_hudi_loki-push-url: ${deploy.runtime.hudi.loki-push-url}
arguments:
- spring_application_name: service-launcher-runner-b12
+ spring_application_name: service-launcher-b12
connector_cluster_cluster: b12
connector_cluster_sync-queue-name: sync-queue-b12
connector_cluster_compaction-queue-name: compaction-queue-b12
diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/A4LauncherService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/A4LauncherService.java
index 55fe4e6..f36e862 100644
--- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/A4LauncherService.java
+++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/A4LauncherService.java
@@ -7,6 +7,6 @@ import com.lanyuanxiaoyao.service.forest.service.launcher.LauncherService;
* @author lanyuanxiaoyao
* @date 2023-06-06
*/
-@BaseRequest(baseURL = "http://service-launcher-runner-a4")
+@BaseRequest(baseURL = "http://service-launcher-a4")
public interface A4LauncherService extends LauncherService {
}
diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B12LauncherService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B12LauncherService.java
index b4c3fb1..1009ab0 100644
--- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B12LauncherService.java
+++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B12LauncherService.java
@@ -7,6 +7,6 @@ import com.lanyuanxiaoyao.service.forest.service.launcher.LauncherService;
* @author lanyuanxiaoyao
* @date 2023-06-06
*/
-@BaseRequest(baseURL = "http://service-launcher-runner-b12")
+@BaseRequest(baseURL = "http://service-launcher-b12")
public interface B12LauncherService extends LauncherService {
}
diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B1LauncherService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B1LauncherService.java
index a674a63..2659d45 100644
--- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B1LauncherService.java
+++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B1LauncherService.java
@@ -9,6 +9,6 @@ import com.lanyuanxiaoyao.service.forest.service.launcher.LauncherService;
* @author lanyuanxiaoyao
* @date 2023-06-06
*/
-@BaseRequest(baseURL = "http://service-launcher-runner-b1")
+@BaseRequest(baseURL = "http://service-launcher-b1")
public interface B1LauncherService extends LauncherService {
}
diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B5LauncherService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B5LauncherService.java
index a14d558..a61c76f 100644
--- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B5LauncherService.java
+++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B5LauncherService.java
@@ -7,6 +7,6 @@ import com.lanyuanxiaoyao.service.forest.service.launcher.LauncherService;
* @author lanyuanxiaoyao
* @date 2023-06-06
*/
-@BaseRequest(baseURL = "http://service-launcher-runner-b5")
+@BaseRequest(baseURL = "http://service-launcher-b5")
public interface B5LauncherService extends LauncherService {
}
diff --git a/service-launcher/pom.xml b/service-launcher/pom.xml
new file mode 100644
index 0000000..08ff946
--- /dev/null
+++ b/service-launcher/pom.xml
@@ -0,0 +1,176 @@
+
+
+ 4.0.0
+
+ com.lanyuanxiaoyao
+ hudi-service
+ 1.0.0-SNAPSHOT
+
+
+ service-launcher
+
+
+
+ com.lanyuanxiaoyao
+ service-configuration
+ 1.0.0-SNAPSHOT
+
+
+ org.apache.logging.log4j
+ log4j-to-slf4j
+
+
+
+
+ org.apache.curator
+ curator-recipes
+ 5.1.0
+
+
+ org.apache.hudi
+ hudi-flink${flink.major.version}-bundle
+ ${hudi.version}
+
+
+ com.lanyuanxiaoyao
+ service-forest
+ 1.0.0-SNAPSHOT
+
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+ org.apache.logging.log4j
+ log4j-to-slf4j
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+
+
+ com.eshore.odcp.hudi.connector
+ executor
+ 1.0.0-SNAPSHOT
+
+
+ com.sun.jersey
+ jersey-client
+ 1.19.4
+
+
+ com.sun.jersey
+ jersey-core
+ 1.19.4
+
+
+ com.sun.jersey.contribs
+ jersey-apache-client4
+ 1.19.4
+
+
+ org.apache.pulsar
+ pulsar-client
+ runtime
+
+
+
+
+ ${artifactId}-${build-tag}-${version}
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+ 3.2.0
+
+
+ copy-config-file
+ validate
+
+ copy-resources
+
+
+ ${project.build.directory}/classes
+
+
+ ${project.parent.basedir}/config/${build-tag}
+
+ *.xml
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+ false
+ true
+
+
+ META-INF/spring.handlers
+
+
+ META-INF/spring.factories
+
+
+ META-INF/spring.schemas
+
+
+
+ com.lanyuanxiaoyao.service.launcher.LauncherRunnerApplication
+
+
+
+ reference.conf
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ core-default.xml
+ hdfs-default.xml
+ yarn-default.xml
+ log4j-surefire*.properties
+
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot.version}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java
new file mode 100644
index 0000000..9e457bc
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java
@@ -0,0 +1,355 @@
+package com.lanyuanxiaoyao.service.launcher;
+
+import cn.hutool.core.lang.Tuple;
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.ReUtil;
+import cn.hutool.core.util.StrUtil;
+import com.eshore.odcp.hudi.connector.Constants;
+import com.eshore.odcp.hudi.connector.entity.FlinkJob;
+import com.eshore.odcp.hudi.connector.entity.TableMeta;
+import com.eshore.odcp.hudi.connector.exception.CheckpointRootPathNotFoundException;
+import com.eshore.odcp.hudi.connector.utils.NameHelper;
+import com.eshore.odcp.hudi.connector.utils.executor.Runner;
+import com.lanyuanxiaoyao.service.forest.service.InfoService;
+import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration;
+import com.lanyuanxiaoyao.service.launcher.configuration.HudiConfiguration;
+import com.lanyuanxiaoyao.service.launcher.utils.HadoopUtil;
+import com.lanyuanxiaoyao.service.launcher.utils.JacksonUtil;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import javax.annotation.Resource;
+import org.apache.flink.client.cli.ClientOptions;
+import org.apache.flink.configuration.*;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hudi.common.util.collection.Pair;
+import org.eclipse.collections.api.factory.Lists;
+import org.eclipse.collections.api.list.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import static com.eshore.odcp.hudi.connector.Constants.HALF_HOUR;
+import static com.eshore.odcp.hudi.connector.Constants.MINUTE;
+
+/**
+ * 执行器
+ *
+ * @author ZhangJiacheng
+ * @date 2022-06-06
+ */
+@Service
+public class ExecutorService {
+ private static final Logger logger = LoggerFactory.getLogger(ExecutorService.class);
+ private static final ObjectMapper MAPPER = JacksonUtil.getMapper();
+ private static final Pattern EXECUTOR_JAR_NAME = Pattern.compile(".+sync-(\\d+)\\.jar");
+ @Resource
+ private InfoService infoService;
+ @Resource
+ private HadoopConfiguration hadoopConfiguration;
+ @Resource
+ private HudiConfiguration hudiConfiguration;
+
+ public static String[] syncArgs(FlinkJob flinkJob, ImmutableList tableMetaList, String cluster) throws JsonProcessingException {
+ List argsList = Lists.mutable.empty();
+ argsList.add(Constants.FLINK_JOB_OPTION);
+ argsList.add(MAPPER.writeValueAsString(flinkJob));
+ argsList.add(Constants.TABLE_META_LIST_OPTION);
+ argsList.add(MAPPER.writeValueAsString(tableMetaList));
+ argsList.add(Constants.CLUSTER_OPTION);
+ argsList.add(cluster);
+ return argsList.toArray(new String[]{});
+ }
+
+ public static String[] compactionArgs(FlinkJob flinkJob, TableMeta tableMeta, String instants, String cluster) throws JsonProcessingException {
+ List argsList = Lists.mutable.empty();
+ argsList.add(Constants.FLINK_JOB_OPTION);
+ argsList.add(MAPPER.writeValueAsString(flinkJob));
+ argsList.add(Constants.TABLE_META_OPTION);
+ argsList.add(MAPPER.writeValueAsString(tableMeta));
+ if (ObjectUtil.isNotEmpty(instants)) {
+ argsList.add(Constants.INSTANTS_OPTION);
+ argsList.add(instants);
+ }
+ argsList.add(Constants.CLUSTER_OPTION);
+ argsList.add(cluster);
+ return argsList.toArray(new String[]{});
+ }
+
+ private Memory jobMemory(FlinkJob flinkJob, ImmutableList tableMetas) {
+ int jobMemory = 1024;
+ int jobMetaspaceMemory = 256;
+ int taskMemory = 512;
+ int taskMetaspaceMemory = 256;
+
+ switch (flinkJob.getRunMode()) {
+ case ALL_IN_ONE:
+ jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory);
+ jobMetaspaceMemory = tableMetas.size() * 25;
+ taskMemory = (int) tableMetas.sumOfInt(meta -> meta.getSyncYarn().getTaskManagerMemory());
+ break;
+ case ALL_IN_ONE_BY_SCHEMA:
+ jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory);
+ jobMetaspaceMemory = tableMetas.distinctBy(TableMeta::getSchema).size() * 25;
+ taskMemory = (int) tableMetas.sumByInt(TableMeta::getSchema, meta -> meta.getSyncYarn().getTaskManagerMemory()).max();
+ break;
+ case ALL_IN_ONE_BY_TABLE:
+ jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory);
+ jobMetaspaceMemory = tableMetas.distinctBy(TableMeta::getTable).size() * 25;
+ taskMemory = (int) tableMetas.sumByInt(TableMeta::getTable, meta -> meta.getSyncYarn().getTaskManagerMemory()).max();
+ break;
+ case ONE_IN_ONE:
+ // jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory);
+ jobMemory = flinkJob.getOneInOneSyncYarn().getJobManagerMemory();
+ jobMetaspaceMemory = tableMetas.size() * 25;
+ taskMemory = flinkJob.getOneInOneSyncYarn().getTaskManagerMemory();
+ break;
+ default:
+ }
+
+ taskMetaspaceMemory = Math.min(taskMemory / 3, tableMetas.size() * 25);
+
+ return new Memory(
+ Math.max(512, jobMemory),
+ Math.max(256, jobMetaspaceMemory),
+ Math.max(512, taskMemory),
+ Math.max(256, taskMetaspaceMemory)
+ );
+ }
+
+ private String getLatestExecutorJarPath() throws IOException {
+ try (FileSystem fileSystem = HadoopUtil.createFileSystem(HadoopUtil.createConfiguration(hadoopConfiguration))) {
+ Path root = new Path(hudiConfiguration.getAppHdfsPath());
+ return Lists.immutable.of(fileSystem.listStatus(root))
+ .select(FileStatus::isFile)
+ .collect(FileStatus::getPath)
+ .collect(Path::toString)
+ .select(path -> ReUtil.isMatch(EXECUTOR_JAR_NAME, path))
+ .collect(path -> new Tuple(path, getLatestExecutorJarVersion(path)))
+ .reject(tuple -> tuple.get(1) < 0)
+ .maxByOptional(tuple -> tuple.get(1))
+ .orElseThrow(() -> new RuntimeException("Latest jar path not found"))
+ .get(0);
+ }
+ }
+
+ private Long getLatestExecutorJarVersion(String path) {
+ return Optional.ofNullable(ReUtil.get(EXECUTOR_JAR_NAME, path, 1)).map(Long::valueOf).orElse(-1L);
+ }
+
+ private void setEnvironment(Configuration configuration, String key, String value) {
+ configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + key, value);
+ configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + key, value);
+ }
+
+ private Configuration commonConfiguration(String keytabPath, String principal) {
+ Configuration configuration = new Configuration();
+
+ configuration.setBoolean(JobManagerOptions.JVM_DIRECT_MEMORY_LIMIT_ENABLED, true);
+ // configuration.setString(JobManagerOptions.ARCHIVE_DIR, hudiConfiguration.getArchiveHdfsPath());
+
+ configuration.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
+ configuration.setString(AkkaOptions.TCP_TIMEOUT, "15 min");
+ configuration.setString(AkkaOptions.LOOKUP_TIMEOUT, "10 min");
+ configuration.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(30));
+
+ // Kerberos认证
+ configuration.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, true);
+ if (StrUtil.isNotBlank(keytabPath)) {
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+ }
+ if (StrUtil.isNotBlank(principal)) {
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, principal);
+ }
+
+ // configuration.setString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS, "0.0.0.0");
+ // configuration.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 8083);
+
+ configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, MINUTE);
+ configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, HALF_HOUR);
+
+ configuration.setString(AkkaOptions.ASK_TIMEOUT, "1 min");
+ configuration.setString(AkkaOptions.TCP_TIMEOUT, "2 min");
+
+ configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+ configuration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "4");
+ // configuration.setString(YarnConfigOptions.STAGING_DIRECTORY, "hdfs://b2/apps/datalake/yarn");
+
+ configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
+ configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
+
+ configuration.setString(RestOptions.BIND_PORT, "8084-9400");
+
+ configuration.setString(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
+
+ if (StrUtil.isNotBlank(hudiConfiguration.getVictoriaPushUrl())) {
+ configuration.setString("metrics.reporter.victoria.class", "com.eshore.odcp.hudi.connector.utils.executor.metrics.VictoriaMetricsReporter");
+ configuration.setString("metrics.reporter.victoria.endpoint", hudiConfiguration.getVictoriaPushUrl());
+ configuration.setBoolean("metrics.reporter.victoria.enable.auth", true);
+ configuration.setString("metrics.reporter.victoria.auth.username", "EsCFVuNkiDWv7PKmcF");
+ configuration.setString("metrics.reporter.victoria.auth.password", "Abf%x9ocS^iKr3tgrd");
+ }
+
+ // 设置 JVM 参数
+ // configuration.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, "-XX:CMSInitiatingOccupancyFraction=40");
+ // configuration.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, "-XX:+UseG1GC -XX:MaxGCPauseMillis=1500 -XX:InitiatingHeapOccupancyPercent=30 -XX:SurvivorRatio=4 -XX:ParallelGCThreads=60");
+
+ return configuration;
+ }
+
+ public ApplicationId runSync(Long flinkJobId, String keytabPath, String principal, String cluster) throws Exception {
+ FlinkJob flinkJob = infoService.flinkJobDetail(flinkJobId);
+ ImmutableList tableMetas = infoService.tableMetaList(flinkJobId);
+ if (ObjectUtil.isEmpty(tableMetas)) {
+ throw new RuntimeException("Table metas not found");
+ }
+ return runSync(flinkJob, tableMetas, keytabPath, principal, cluster);
+ }
+
+ public ApplicationId runSync(FlinkJob flinkJob, ImmutableList tableMetaList, String keytabPath, String principal, String cluster) throws Exception {
+ logger.info("Running Sync: {} {} {}", flinkJob.getId(), keytabPath, principal);
+
+ try (FileSystem fileSystem = HadoopUtil.createFileSystem(HadoopUtil.createConfiguration(hadoopConfiguration))) {
+ String checkpointRootPath = tableMetaList.collect(meta -> meta.getConfig().getCheckpointRootPath())
+ .distinct()
+ .getFirstOptional()
+ .orElseThrow(CheckpointRootPathNotFoundException::new);
+ String checkpointPath = checkpointRootPath + "/" + flinkJob.getId();
+ logger.info("Delete checkpoint path {}", checkpointPath);
+ fileSystem.delete(new Path(checkpointPath), true);
+ }
+
+ Configuration configuration = commonConfiguration(keytabPath, principal);
+
+ Memory memory = jobMemory(flinkJob, tableMetaList);
+ // JobManager的内存
+ MemorySize jobMemorySize = MemorySize.parse(memory.jobMemory + "m");
+ configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, jobMemorySize);
+ configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.parse(memory.jobMetaspaceMemory + "m"));
+
+ // TaskManager的内存
+ MemorySize taskMemorySize = MemorySize.parse(memory.taskMemory + "m");
+ configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, taskMemorySize);
+ configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("0m"));
+ configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.parse(memory.taskMetaspaceMemory + "m"));
+
+ // 并行度
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10);
+
+ configuration.setString(YarnConfigOptions.APPLICATION_NAME, NameHelper.syncJobName(flinkJob.getId(), flinkJob.getName()));
+
+ // configuration.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, "hdfs://b2/apps/flink/completed-jobs/");
+ // configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000);
+
+ // 业务jar包
+ String executorJarPath = getLatestExecutorJarPath();
+ logger.info("Executor jar path: {}", executorJarPath);
+ Long executorJarVersion = getLatestExecutorJarVersion(executorJarPath);
+ configuration.set(PipelineOptions.JARS, new ArrayList() {{
+ add(executorJarPath);
+ }});
+
+ String flinkJobName = flinkJob.getName().replaceAll("\\s", "_");
+
+ setEnvironment(configuration, Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC);
+ setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_ID, String.valueOf(flinkJob.getId()));
+ setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJobName);
+ setEnvironment(configuration, Constants.METRICS_LABEL_EXECUTOR_VERSION, String.valueOf(executorJarVersion));
+ setEnvironment(configuration, Constants.METRICS_LABEL_CLUSTER, cluster);
+
+ setEnvironment(configuration, Constants.LOKI_PUSH_URL, hudiConfiguration.getLokiPushUrl());
+
+ configuration.setString("metrics.reporter.victoria.tags", Lists.immutable.of(
+ Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC),
+ Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()),
+ Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJobName),
+ Pair.of(Constants.METRICS_LABEL_EXECUTOR_VERSION, executorJarVersion)
+ ).collect(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).makeString(";"));
+
+ return Runner.run(
+ configuration,
+ "com.eshore.odcp.hudi.connector.sync.Synchronizer",
+ syncArgs(flinkJob, tableMetaList, cluster)
+ );
+ }
+
+ public ApplicationId runCompaction(String batchCode, FlinkJob flinkJob, TableMeta tableMeta, String keytabPath, String principal, String instants, String cluster) throws Exception {
+ logger.info("Running Compaction: {} {} {} {} {}", batchCode, flinkJob.getId(), tableMeta.getAlias(), keytabPath, principal);
+
+ Configuration configuration = commonConfiguration(keytabPath, principal);
+
+ MemorySize jobMemorySize = MemorySize.parse(tableMeta.getCompactionYarn().getJobManagerMemory() + "m");
+ configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, jobMemorySize);
+
+ MemorySize taskMemorySize = MemorySize.parse(tableMeta.getCompactionYarn().getTaskManagerMemory() + "m");
+ configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, taskMemorySize);
+ configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("0m"));
+
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10);
+
+ configuration.setString(YarnConfigOptions.APPLICATION_NAME, NameHelper.compactionJobName(flinkJob.getId(), tableMeta.getAlias()));
+
+ String executorJarPath = getLatestExecutorJarPath();
+ logger.info("Executor jar path: {}", executorJarPath);
+ Long executorJarVersion = getLatestExecutorJarVersion(executorJarPath);
+ configuration.set(PipelineOptions.JARS, new ArrayList() {{
+ add(executorJarPath);
+ }});
+
+ String flinkJobName = flinkJob.getName().replaceAll("\\s", "_");
+
+ setEnvironment(configuration, Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION);
+ setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_ID, String.valueOf(flinkJob.getId()));
+ setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJobName);
+ setEnvironment(configuration, Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema());
+ setEnvironment(configuration, Constants.METRICS_LABEL_TABLE, tableMeta.getTable());
+ setEnvironment(configuration, Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias());
+ setEnvironment(configuration, Constants.METRICS_LABEL_BATCH_ID, batchCode);
+ setEnvironment(configuration, Constants.METRICS_LABEL_EXECUTOR_VERSION, String.valueOf(executorJarVersion));
+ setEnvironment(configuration, Constants.METRICS_LABEL_CLUSTER, cluster);
+
+ setEnvironment(configuration, Constants.LOKI_PUSH_URL, hudiConfiguration.getLokiPushUrl());
+
+ configuration.setString("metrics.reporter.victoria.tags", Lists.immutable.of(
+ Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION),
+ Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()),
+ Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")),
+ Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()),
+ Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()),
+ Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias()),
+ Pair.of(Constants.METRICS_LABEL_BATCH_ID, batchCode),
+ Pair.of(Constants.METRICS_LABEL_EXECUTOR_VERSION, executorJarVersion)
+ ).collect(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).makeString(";"));
+
+ return Runner.run(
+ configuration,
+ "com.eshore.odcp.hudi.connector.sync.Compactor",
+ compactionArgs(flinkJob, tableMeta, instants, cluster)
+ );
+ }
+
+ private static final class Memory {
+ private final int jobMemory;
+ private final int jobMetaspaceMemory;
+ private final int taskMemory;
+ private final int taskMetaspaceMemory;
+
+ public Memory(int jobMemory, int jobMetaspaceMemory, int taskMemory, int taskMetaspaceMemory) {
+ this.jobMemory = jobMemory;
+ this.jobMetaspaceMemory = jobMetaspaceMemory;
+ this.taskMemory = taskMemory;
+ this.taskMetaspaceMemory = taskMetaspaceMemory;
+ }
+ }
+}
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/LauncherRunnerApplication.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/LauncherRunnerApplication.java
new file mode 100644
index 0000000..60a50a1
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/LauncherRunnerApplication.java
@@ -0,0 +1,35 @@
+package com.lanyuanxiaoyao.service.launcher;
+
+import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.ComponentScans;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+/**
+ * 启动类
+ *
+ * @author ZhangJiacheng
+ * @date 2023-04-05
+ */
+@EnableDiscoveryClient
+@SpringBootApplication(exclude = {GsonAutoConfiguration.class})
+@ComponentScans({
+ @ComponentScan("com.lanyuanxiaoyao.service"),
+})
+@EnableScheduling
+@EnableConfigurationProperties
+@EnableEncryptableProperties
+public class LauncherRunnerApplication {
+ private static final Logger logger = LoggerFactory.getLogger(LauncherRunnerApplication.class);
+
+ public static void main(String[] args) {
+ SpringApplication.run(LauncherRunnerApplication.class, args);
+ }
+}
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/controller/CompactionController.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/controller/CompactionController.java
new file mode 100644
index 0000000..8e961c5
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/controller/CompactionController.java
@@ -0,0 +1,110 @@
+package com.lanyuanxiaoyao.service.launcher.compaction.controller;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import com.eshore.odcp.hudi.connector.Constants;
+import com.eshore.odcp.hudi.connector.entity.RunMeta;
+import com.eshore.odcp.hudi.connector.entity.SyncState;
+import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
+import com.lanyuanxiaoyao.service.forest.service.InfoService;
+import com.lanyuanxiaoyao.service.forest.service.ZookeeperService;
+import com.lanyuanxiaoyao.service.launcher.configuration.ClusterConfiguration;
+import java.io.IOException;
+import java.util.EnumSet;
+import javax.annotation.PreDestroy;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.client.discovery.DiscoveryClient;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * 提供和集群绑定的一些操作
+ *
+ * @author ZhangJiacheng
+ * @date 2023-05-30
+ */
+@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
+@RestController
+@RequestMapping("launcher/compaction")
+public class CompactionController {
+ private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
+
+ private final DiscoveryClient discoveryClient;
+ private final ClusterConfiguration clusterConfiguration;
+ private final InfoService infoService;
+ private final ZookeeperService zookeeperService;
+ private final YarnClient yarnClient;
+
+ public CompactionController(DiscoveryClient discoveryClient, ClusterConfiguration clusterConfiguration, InfoService infoService, ZookeeperService zookeeperService) {
+ this.discoveryClient = discoveryClient;
+ this.clusterConfiguration = clusterConfiguration;
+ this.infoService = infoService;
+ this.zookeeperService = zookeeperService;
+
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(new Configuration());
+ yarnClient.start();
+ }
+
+ @PreDestroy
+ public void destroy() throws IOException {
+ if (ObjectUtil.isNotNull(yarnClient)) {
+ yarnClient.stop();
+ yarnClient.close();
+ }
+ }
+
+ @GetMapping("stop")
+ public void stop(
+ @RequestParam("flink_job_id") Long flinkJobId,
+ @RequestParam("alias") String alias,
+ @RequestParam(value = "disable_meta", defaultValue = "true") Boolean disableMeta
+ ) {
+ logger.info("Enter method: stop[flinkJobId, alias]. " + "flinkJobId:" + flinkJobId + "," + "alias:" + alias);
+ if (disableMeta) {
+ logger.info("Delete table info");
+ infoService.disableTable(flinkJobId, alias);
+ }
+ QueueUtil.remove(discoveryClient, clusterConfiguration.getCompactionQueueName(), StrUtil.format("{}-{}", flinkJobId, alias));
+ // 再删除一次,避免时间差导致任务重新放回预调度队列
+ QueueUtil.remove(discoveryClient, Constants.COMPACTION_QUEUE_PRE, StrUtil.format("{}-{}", flinkJobId, alias));
+ logger.info("Remove job form queue");
+ try {
+ RunMeta meta = zookeeperService.getSyncRunMeta(flinkJobId, alias);
+ if (ObjectUtil.isNotNull(meta) && ObjectUtil.isNotEmpty(meta.getApplicationId())) {
+ String applicationId = meta.getApplicationId();
+ stopApp(applicationId);
+ logger.info("Found application id and kill: {}", applicationId);
+ } else {
+ SyncState syncState = infoService.syncStateDetail(flinkJobId, alias);
+ if (ObjectUtil.isNotEmpty(syncState.getCompactionApplicationId())) {
+ String applicationId = syncState.getCompactionApplicationId();
+ stopApp(applicationId);
+ logger.info("Found application id and kill: {}", applicationId);
+ }
+ }
+ } catch (Throwable throwable) {
+ logger.warn("Application id not found, it may stop yet");
+ }
+ }
+
+ @GetMapping("stop_app")
+ public void stopApp(@RequestParam("application_id") String applicationId) throws IOException, YarnException {
+ boolean exists = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING))
+ .stream()
+ .anyMatch(r -> StrUtil.equals(r.getApplicationId().toString(), applicationId));
+ if (exists) {
+ yarnClient.killApplication(ApplicationId.fromString(applicationId));
+ } else {
+ logger.info("Application: {} not running", applicationId);
+ }
+ }
+}
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java
new file mode 100644
index 0000000..781632b
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java
@@ -0,0 +1,274 @@
+package com.lanyuanxiaoyao.service.launcher.compaction.service;
+
+import cn.hutool.core.thread.ThreadUtil;
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import com.eshore.odcp.hudi.connector.Constants;
+import com.eshore.odcp.hudi.connector.entity.FlinkJob;
+import com.eshore.odcp.hudi.connector.entity.TableMeta;
+import com.eshore.odcp.hudi.connector.entity.compaction.ScheduleJob;
+import com.eshore.odcp.hudi.connector.utils.LogHelper;
+import com.eshore.odcp.hudi.connector.utils.NameHelper;
+import com.eshore.odcp.hudi.connector.utils.TableMetaHelper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
+import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
+import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
+import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
+import com.lanyuanxiaoyao.service.forest.service.HudiService;
+import com.lanyuanxiaoyao.service.forest.service.InfoService;
+import com.lanyuanxiaoyao.service.launcher.ExecutorService;
+import com.lanyuanxiaoyao.service.launcher.configuration.ClusterConfiguration;
+import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration;
+import com.lanyuanxiaoyao.service.launcher.configuration.ZookeeperConfiguration;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.PreDestroy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.data.Stat;
+import org.eclipse.collections.api.factory.Lists;
+import org.eclipse.collections.api.list.ImmutableList;
+import org.eclipse.collections.api.list.MutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.client.discovery.DiscoveryClient;
+import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author ZhangJiacheng
+ * @date 2023-05-08
+ */
+@Service
+public class CompactionService {
+ private static final Logger logger = LoggerFactory.getLogger(CompactionService.class);
+ private static final java.util.concurrent.ExecutorService EXECUTOR = Executors.newWorkStealingPool(20);
+ private static final RetryPolicy RETRY_POLICY = RetryPolicy.builder()
+ .handle(Exception.class)
+ .withDelay(Duration.ofSeconds(1))
+ .withMaxAttempts(3)
+ .build();
+ private static final int FILE_PER_PARALLELISM = 2;
+ private static final int MAX_PARALLELISM = 500;
+ private static final int MIN_PARALLELISM = 10;
+ private final HadoopConfiguration hadoopConfiguration;
+ private final ClusterConfiguration clusterConfiguration;
+ private final DiscoveryClient discoveryClient;
+ private final CuratorFramework zookeeperClient;
+ private final InfoService infoService;
+ private final HudiService hudiService;
+ private final ExecutorService executorService;
+ private final ObjectMapper mapper;
+
+ @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
+ public CompactionService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ZookeeperConfiguration zookeeperConfiguration, DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ExecutorService executorService, Jackson2ObjectMapperBuilder builder) {
+ this.hadoopConfiguration = hadoopConfiguration;
+ this.clusterConfiguration = clusterConfiguration;
+ this.discoveryClient = discoveryClient;
+ this.infoService = infoService;
+ this.hudiService = hudiService;
+ this.executorService = executorService;
+ mapper = builder.build();
+ this.zookeeperClient = CuratorFrameworkFactory.builder()
+ .connectString(zookeeperConfiguration.getConnectUrl())
+ .retryPolicy(new ExponentialBackoffRetry((int) Constants.SECOND, 3))
+ .sessionTimeoutMs((int) Constants.SECOND)
+ .connectionTimeoutMs((int) (30 * Constants.SECOND))
+ .build();
+ this.zookeeperClient.start();
+ }
+
+ @PreDestroy
+ public void destroy() {
+ if (ObjectUtil.isNotNull(this.zookeeperClient)) {
+ this.zookeeperClient.close();
+ }
+ }
+
+ @Scheduled(fixedDelay = 5, initialDelay = 30, timeUnit = TimeUnit.SECONDS)
+ public void scheduleCompact() {
+ java.util.concurrent.ExecutorService threadPool = Executors.newWorkStealingPool(5);
+ if (Boolean.FALSE.equals(QueueUtil.isEmpty(discoveryClient, clusterConfiguration.getCompactionQueueName()))) {
+ for (int index = 0; index < 5; index++) {
+ threadPool.submit(this::compact);
+ }
+ threadPool.shutdown();
+ while (!threadPool.isTerminated()) {
+ ThreadUtil.safeSleep(10 * Constants.SECOND);
+ }
+ logger.info("Finish compaction schedule");
+ }
+ }
+
+ @SuppressWarnings("DataFlowIssue")
+ private void compact() {
+ // 每次把由于各种原因无法运行的任务先放在暂存 list 里面,直到遇到一个能执行的任务,就把这批任务送回队列
+ MutableList> holder = Lists.mutable.empty();
+
+ QueueItem item = QueueUtil.poll(discoveryClient, this.mapper, clusterConfiguration.getCompactionQueueName());
+ // 检查是否获取到新的任务,如果队列为空,获取到的任务也是空
+ while (ObjectUtil.isNotNull(item)) {
+ LogHelper.setMdc(Constants.LOG_JOB_ID_LABEL, item.getTraceId());
+ try {
+ // 检查任务内容是否存在
+ if (ObjectUtil.isNotNull(item.getData())) {
+ ScheduleJob job = item.getData();
+ LogHelper.setMdcFlinkJobAndAlias(job.getFlinkJobId(), job.getAlias());
+ logger.info("Receive job[{}]({}): {}", item.getTraceId(), item.getCreateTime(), item.getData());
+ // 构造任务相关的锁
+ String lockPath = NameHelper.compactionLauncherLockPath(job.getFlinkJobId(), job.getAlias());
+ InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath);
+ try {
+ if (lock.acquire(2, TimeUnit.SECONDS)) {
+ Stat stat = zookeeperClient.checkExists().forPath(NameHelper.compactionRunningLockPath(job.getFlinkJobId(), job.getAlias()));
+ if (ObjectUtil.isNotNull(stat)) {
+ logger.info("Job {} {} is running", job.getFlinkJobId(), job.getAlias());
+ // 运行中的任务放在持有容器中
+ holder.add(item);
+ // 进入下一轮,由于最外层有一个 finally,所以直接 continue 也会尝试获取新的任务
+ continue;
+ }
+ FlinkJob flinkJob = infoService.flinkJobDetail(job.getFlinkJobId());
+ TableMeta meta = infoService.tableMetaDetail(job.getFlinkJobId(), job.getAlias());
+ if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) {
+ logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias());
+ clearHolder(holder);
+ continue;
+ }
+ logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias());
+ // 判断是否存在 Hudi 表,提前结束掉
+ if (!hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias())) {
+ logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias());
+ clearHolder(holder);
+ continue;
+ }
+ // 获取待压缩的时间点
+ ImmutableList selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias());
+ if (ObjectUtil.isEmpty(selectedInstants)) {
+ logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias());
+ clearHolder(holder);
+ continue;
+ }
+ logger.info("[{}] [{}] Selected Instants: {}", flinkJob.getId(), meta.getAlias(), selectedInstants.makeString(","));
+ // 计算待压缩的文件数
+ long count = predictCompactFileCount(meta, selectedInstants);
+ if (ObjectUtil.isNotNull(count)) {
+ // 根据待压缩的文件数计算并行度
+ long parallelism = predictParallelism(count);
+ logger.info("[{}] [{}] Predict compact files: {} {}", flinkJob.getId(), meta.getAlias(), count, parallelism);
+ meta.getHudi().setCompactionTasks((int) parallelism);
+ }
+ logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias());
+ String applicationId = Failsafe.with(RETRY_POLICY)
+ .get(() -> executorService.runCompaction(
+ job.getBatch(),
+ flinkJob,
+ meta,
+ hadoopConfiguration.getKerberosKeytabPath(),
+ hadoopConfiguration.getKerberosPrincipal(),
+ selectedInstants.collect(HudiInstant::getTimestamp).makeString(","),
+ clusterConfiguration.getCluster()
+ ).toString());
+ Failsafe.with(RETRY_POLICY)
+ .run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId));
+ clearHolder(holder);
+ } else {
+ logger.warn("Un acquire lock for " + item.getId());
+ holder.add(item);
+ }
+ } catch (Exception e) {
+ logger.warn(StrUtil.format("[{}] [{}] Try lock something wrong ", job.getFlinkJobId(), job.getAlias()), e);
+ String failCount = item.getMetadata(Constants.SCHEDULE_JOB_FAIL_COUNT);
+ if (StrUtil.isNotBlank(failCount)) {
+ int fail = Integer.parseInt(failCount);
+ if (fail > 5) {
+ logger.error("Job {} cause unaccepted error", item);
+ continue;
+ } else {
+ item.getMetadata().put(Constants.SCHEDULE_JOB_FAIL_COUNT, String.valueOf(fail + 1));
+ }
+ } else {
+ item.getMetadata().put(Constants.SCHEDULE_JOB_FAIL_COUNT, "1");
+ }
+ QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item);
+ } finally {
+ // 无论如何,尝试解锁
+ try {
+ if (lock.isAcquiredInThisProcess()) {
+ lock.release();
+ }
+ } catch (Exception e) {
+ logger.error("Release lock failure " + lockPath, e);
+ }
+ }
+ } else {
+ logger.warn("Schedule job is empty. [{}]({}): {}", item.getTraceId(), item.getCreateTime(), item);
+ }
+ } finally {
+ // 无论如何尝试获取下个任务
+ item = QueueUtil.poll(discoveryClient, this.mapper, clusterConfiguration.getCompactionQueueName());
+ LogHelper.removeMdc(Constants.LOG_JOB_ID_LABEL, Constants.LOG_FLINK_JOB_ID_LABEL, Constants.LOG_ALIAS_LABEL);
+ }
+ }
+ clearHolder(holder);
+ }
+
+ private void clearHolder(MutableList> holder) {
+ if (holder.isEmpty()) {
+ return;
+ }
+ // 等待半分钟
+ ThreadUtil.safeSleep(Constants.HALF_MINUTE);
+ for (QueueItem item : holder) {
+ QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item);
+ }
+ holder.clear();
+ }
+
+ private Long predictCompactFileCount(TableMeta meta, ImmutableList selectedInstants) {
+ long count = selectedInstants.asParallel(EXECUTOR, 1)
+ .sumOfLong(instant -> {
+ try {
+ HudiCompactionPlan plan = hudiService.readCompactionPlan(meta.getJob().getId(), meta.getAlias(), instant.getTimestamp());
+ return plan.getOperations().size();
+ } catch (Exception e) {
+ logger.error("Read compaction plan failure", e);
+ }
+ return 0;
+ });
+ return count < 1 ? meta.getHudi().getBucketIndexNumber() : count;
+ }
+
+ private Integer predictParallelism(Long count) {
+ long parallelism = Math.round(count * 1.0 / FILE_PER_PARALLELISM);
+ if (parallelism > MAX_PARALLELISM) {
+ parallelism = MAX_PARALLELISM;
+ }
+ if (parallelism < MIN_PARALLELISM) {
+ parallelism = MIN_PARALLELISM;
+ }
+ return Math.toIntExact(parallelism);
+ }
+
+ private QueueItem deserialize(String body) {
+ if (StrUtil.isBlank(body)) {
+ return null;
+ }
+ try {
+ return mapper.readValue(body, new TypeReference>() {
+ });
+ } catch (Throwable error) {
+ logger.error("Schedule job parse error", error);
+ return null;
+ }
+ }
+}
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ClusterConfiguration.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ClusterConfiguration.java
new file mode 100644
index 0000000..4141132
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ClusterConfiguration.java
@@ -0,0 +1,61 @@
+package com.lanyuanxiaoyao.service.launcher.configuration;
+
+import javax.annotation.PostConstruct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * 集群相关配置
+ *
+ * @author ZhangJiacheng
+ * @date 2023-05-09
+ */
+@ConfigurationProperties("connector.cluster")
+@Component
+public class ClusterConfiguration {
+ private static final Logger logger = LoggerFactory.getLogger(ClusterConfiguration.class);
+
+ private String cluster;
+ private String syncQueueName;
+ private String compactionQueueName;
+
+ @PostConstruct
+ private void init() {
+ logger.info("Configuration initial: {}", this);
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+ public String getSyncQueueName() {
+ return syncQueueName;
+ }
+
+ public void setSyncQueueName(String syncQueueName) {
+ this.syncQueueName = syncQueueName;
+ }
+
+ public String getCompactionQueueName() {
+ return compactionQueueName;
+ }
+
+ public void setCompactionQueueName(String compactionQueueName) {
+ this.compactionQueueName = compactionQueueName;
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterConfiguration{" +
+ "cluster='" + cluster + '\'' +
+ ", syncQueueName='" + syncQueueName + '\'' +
+ ", compactionQueueName='" + compactionQueueName + '\'' +
+ '}';
+ }
+}
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HadoopConfiguration.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HadoopConfiguration.java
new file mode 100644
index 0000000..5645589
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HadoopConfiguration.java
@@ -0,0 +1,51 @@
+package com.lanyuanxiaoyao.service.launcher.configuration;
+
+import javax.annotation.PostConstruct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * yarn 配置
+ *
+ * @author ZhangJiacheng
+ * @date 2022-03-30
+ */
+@ConfigurationProperties("connector.hadoop")
+@Component
+public class HadoopConfiguration {
+ private static final Logger logger = LoggerFactory.getLogger(HadoopConfiguration.class);
+
+ private String kerberosPrincipal;
+ private String kerberosKeytabPath;
+
+ @PostConstruct
+ private void init() {
+ logger.info("Configuration initial: {}", this);
+ }
+
+ public String getKerberosPrincipal() {
+ return kerberosPrincipal;
+ }
+
+ public void setKerberosPrincipal(String kerberosPrincipal) {
+ this.kerberosPrincipal = kerberosPrincipal;
+ }
+
+ public String getKerberosKeytabPath() {
+ return kerberosKeytabPath;
+ }
+
+ public void setKerberosKeytabPath(String kerberosKeytabPath) {
+ this.kerberosKeytabPath = kerberosKeytabPath;
+ }
+
+ @Override
+ public String toString() {
+ return "HadoopConfiguration{" +
+ "kerberosPrincipal='" + kerberosPrincipal + '\'' +
+ ", kerberosKeytabPath='" + kerberosKeytabPath + '\'' +
+ '}';
+ }
+}
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HudiConfiguration.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HudiConfiguration.java
new file mode 100644
index 0000000..c76495a
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HudiConfiguration.java
@@ -0,0 +1,61 @@
+package com.lanyuanxiaoyao.service.launcher.configuration;
+
+import javax.annotation.PostConstruct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * Hudi 配置
+ *
+ * @author ZhangJiacheng
+ * @date 2023-02-03
+ */
+@ConfigurationProperties("connector.hudi")
+@Component
+public class HudiConfiguration {
+ private static final Logger logger = LoggerFactory.getLogger(HudiConfiguration.class);
+
+ private String appHdfsPath;
+ private String victoriaPushUrl;
+ private String lokiPushUrl;
+
+ @PostConstruct
+ private void init() {
+ logger.info("Configuration initial: {}", this);
+ }
+
+ public String getAppHdfsPath() {
+ return appHdfsPath;
+ }
+
+ public void setAppHdfsPath(String appHdfsPath) {
+ this.appHdfsPath = appHdfsPath;
+ }
+
+ public String getVictoriaPushUrl() {
+ return victoriaPushUrl;
+ }
+
+ public void setVictoriaPushUrl(String victoriaPushUrl) {
+ this.victoriaPushUrl = victoriaPushUrl;
+ }
+
+ public String getLokiPushUrl() {
+ return lokiPushUrl;
+ }
+
+ public void setLokiPushUrl(String lokiPushUrl) {
+ this.lokiPushUrl = lokiPushUrl;
+ }
+
+ @Override
+ public String toString() {
+ return "HudiConfiguration{" +
+ "appHdfsPath='" + appHdfsPath + '\'' +
+ ", victoriaPushUrl='" + victoriaPushUrl + '\'' +
+ ", lokiPushUrl='" + lokiPushUrl + '\'' +
+ '}';
+ }
+}
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ZookeeperConfiguration.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ZookeeperConfiguration.java
new file mode 100644
index 0000000..505a6dd
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ZookeeperConfiguration.java
@@ -0,0 +1,41 @@
+package com.lanyuanxiaoyao.service.launcher.configuration;
+
+import javax.annotation.PostConstruct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * Zookeeper 配置
+ *
+ * @author ZhangJiacheng
+ * @date 2023-05-04
+ */
+@ConfigurationProperties("connector.zookeeper")
+@Component
+public class ZookeeperConfiguration {
+ private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfiguration.class);
+
+ private String connectUrl;
+
+ @PostConstruct
+ private void init() {
+ logger.info("Configuration initial: {}", this);
+ }
+
+ public String getConnectUrl() {
+ return connectUrl;
+ }
+
+ public void setConnectUrl(String connectUrl) {
+ this.connectUrl = connectUrl;
+ }
+
+ @Override
+ public String toString() {
+ return "ZookeeperConfiguration{" +
+ "connectUrl='" + connectUrl + '\'' +
+ '}';
+ }
+}
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/controller/SynchronizerController.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/controller/SynchronizerController.java
new file mode 100644
index 0000000..26cdb8f
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/controller/SynchronizerController.java
@@ -0,0 +1,40 @@
+package com.lanyuanxiaoyao.service.launcher.synchronizer.controller;
+
+import com.lanyuanxiaoyao.service.launcher.synchronizer.service.SynchronizerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * 同步控制
+ *
+ * @author ZhangJiacheng
+ * @date 2023-12-07
+ */
+@RestController
+@RequestMapping("launcher/synchronizer")
+public class SynchronizerController {
+ private static final Logger logger = LoggerFactory.getLogger(SynchronizerController.class);
+
+ private final SynchronizerService synchronizerService;
+
+ public SynchronizerController(@Qualifier("sync-service") SynchronizerService synchronizerService) {
+ this.synchronizerService = synchronizerService;
+ }
+
+ @GetMapping("start")
+ public String start(@RequestParam("flink_job_id") Long flinkJobId) throws Exception {
+ logger.info("Try to start job: {}", flinkJobId);
+ return synchronizerService.start(flinkJobId);
+ }
+
+ @GetMapping("stop")
+ public void stop(@RequestParam("flink_job_id") Long flinkJobId) throws Exception {
+ logger.info("Try to start job: {}", flinkJobId);
+ synchronizerService.stop(flinkJobId);
+ }
+}
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/service/SynchronizerService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/service/SynchronizerService.java
new file mode 100644
index 0000000..c81f4b4
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/service/SynchronizerService.java
@@ -0,0 +1,137 @@
+package com.lanyuanxiaoyao.service.launcher.synchronizer.service;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import com.eshore.odcp.hudi.connector.Constants;
+import com.eshore.odcp.hudi.connector.entity.RunMeta;
+import com.eshore.odcp.hudi.connector.utils.NameHelper;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.lanyuanxiaoyao.service.launcher.ExecutorService;
+import com.lanyuanxiaoyao.service.launcher.configuration.ClusterConfiguration;
+import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration;
+import com.lanyuanxiaoyao.service.launcher.configuration.ZookeeperConfiguration;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.PreDestroy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
+import org.springframework.stereotype.Service;
+
+/**
+ * Sync 服务
+ *
+ * @author ZhangJiacheng
+ * @date 2023-12-07
+ */
+@Service("sync-service")
+public class SynchronizerService {
+ private static final Logger logger = LoggerFactory.getLogger(SynchronizerService.class);
+ private static final RetryPolicy RETRY_POLICY = RetryPolicy.builder()
+ .handle(Exception.class)
+ .withDelay(Duration.ofSeconds(1))
+ .withMaxAttempts(3)
+ .build();
+
+ private final HadoopConfiguration hadoopConfiguration;
+ private final ClusterConfiguration clusterConfiguration;
+ private final CuratorFramework zookeeperClient;
+ private final ExecutorService executorService;
+ private final YarnClient yarnClient;
+ private final ObjectMapper mapper;
+
+ public SynchronizerService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ExecutorService executorService, ZookeeperConfiguration zookeeperConfiguration, Jackson2ObjectMapperBuilder builder) {
+ this.hadoopConfiguration = hadoopConfiguration;
+ this.clusterConfiguration = clusterConfiguration;
+ this.executorService = executorService;
+
+ mapper = builder.build();
+
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(new Configuration());
+ yarnClient.start();
+
+ zookeeperClient = CuratorFrameworkFactory.builder()
+ .connectString(zookeeperConfiguration.getConnectUrl())
+ .retryPolicy(new ExponentialBackoffRetry((int) Constants.SECOND, 3))
+ .sessionTimeoutMs((int) Constants.SECOND)
+ .connectionTimeoutMs((int) (30 * Constants.SECOND))
+ .build();
+ zookeeperClient.start();
+ }
+
+ @PreDestroy
+ public void destroy() throws IOException {
+ if (ObjectUtil.isNotNull(yarnClient)) {
+ yarnClient.stop();
+ yarnClient.close();
+ }
+
+ if (ObjectUtil.isNotNull(this.zookeeperClient)) {
+ this.zookeeperClient.close();
+ }
+ }
+
+ public String start(Long flinkJobId) throws Exception {
+ String lockPath = NameHelper.syncLauncherLockPath(flinkJobId);
+ InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath);
+ try {
+ if (lock.acquire(2, TimeUnit.SECONDS)) {
+ Stat stat = zookeeperClient.checkExists().forPath(NameHelper.syncRunningLockPath(flinkJobId, null));
+ if (ObjectUtil.isNotNull(stat)) {
+ String message = StrUtil.format("Job {} is running", flinkJobId);
+ logger.warn(message);
+ throw new RuntimeException(message);
+ }
+ ApplicationId applicationId = Failsafe
+ .with(RETRY_POLICY)
+ .get(() -> executorService.runSync(
+ flinkJobId,
+ hadoopConfiguration.getKerberosKeytabPath(),
+ hadoopConfiguration.getKerberosPrincipal(),
+ clusterConfiguration.getCluster()
+ ));
+ return applicationId.toString();
+ } else {
+ String message = StrUtil.format("Un acquire lock for {}", flinkJobId);
+ logger.warn(message);
+ throw new RuntimeException(message);
+ }
+ } finally {
+ if (lock.isAcquiredInThisProcess()) {
+ lock.release();
+ }
+ }
+ }
+
+ public void stop(Long flinkJobId) throws Exception {
+ String syncLockPath = NameHelper.syncRunningLockPath(flinkJobId, null);
+ Stat stat = zookeeperClient.checkExists().forPath(syncLockPath);
+ if (ObjectUtil.isNull(stat)) {
+ return;
+ }
+ byte[] bytes = zookeeperClient.getData().forPath(syncLockPath);
+ RunMeta meta = mapper.readValue(new String(bytes), RunMeta.class);
+ logger.info("Run meta: {}", meta);
+ String applicationIdText = meta.getApplicationId();
+ if (StrUtil.isBlank(applicationIdText)) {
+ throw new RuntimeException(StrUtil.format("Cannot parse application id from zookeeper: {}", syncLockPath));
+ }
+ ApplicationId applicationId = ApplicationId.fromString(applicationIdText);
+ Failsafe.with(RETRY_POLICY)
+ .run(() -> yarnClient.killApplication(applicationId));
+ logger.info("{} have be killed", applicationId);
+ }
+}
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/HadoopUtil.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/HadoopUtil.java
new file mode 100644
index 0000000..ca26a73
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/HadoopUtil.java
@@ -0,0 +1,64 @@
+package com.lanyuanxiaoyao.service.launcher.utils;
+
+import cn.hutool.core.util.ObjectUtil;
+import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.eclipse.collections.api.factory.Lists;
+import org.eclipse.collections.api.list.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Hadoop 工具类
+ *
+ * @author ZhangJiacheng
+ * @date 2022-04-25
+ */
+public class HadoopUtil {
+ private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class);
+
+ public static Configuration createConfiguration() throws IOException {
+ return createConfiguration(null);
+ }
+
+ public static Configuration createConfiguration(HadoopConfiguration configuration) throws IOException {
+ Configuration hadoopConfiguration = new Configuration();
+ if (ObjectUtil.isNull(configuration)) {
+ return hadoopConfiguration;
+ }
+ UserGroupInformation.setConfiguration(hadoopConfiguration);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(configuration.getKerberosPrincipal(), configuration.getKerberosKeytabPath());
+ }
+ return hadoopConfiguration;
+ }
+
+ public static FileSystem createFileSystem() {
+ return createFileSystem(null);
+ }
+
+ public static FileSystem createFileSystem(Configuration configuration) {
+ try {
+ if (ObjectUtil.isNull(configuration)) {
+ return FileSystem.get(new Configuration());
+ }
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static ImmutableList list(FileSystem fileSystem, Path path) {
+ try {
+ return Lists.immutable.of(fileSystem.listStatus(path));
+ } catch (IOException e) {
+ logger.error("List error", e);
+ return Lists.immutable.empty();
+ }
+ }
+}
diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/JacksonUtil.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/JacksonUtil.java
new file mode 100644
index 0000000..913391e
--- /dev/null
+++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/JacksonUtil.java
@@ -0,0 +1,30 @@
+package com.lanyuanxiaoyao.service.launcher.utils;
+
+import cn.hutool.core.util.ObjectUtil;
+import java.io.Serializable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Json 解析相关工具
+ *
+ * @author ZhangJiacheng
+ * @date 2022-06-12
+ */
+public class JacksonUtil implements Serializable {
+ private static final Logger logger = LoggerFactory.getLogger(JacksonUtil.class);
+
+ private static ObjectMapper INSTANCE = null;
+
+ public static ObjectMapper getMapper() {
+ if (ObjectUtil.isNull(INSTANCE)) {
+ INSTANCE = new ObjectMapper();
+ INSTANCE.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
+ INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+ return INSTANCE;
+ }
+}
diff --git a/service-launcher/src/main/resources/application.yml b/service-launcher/src/main/resources/application.yml
new file mode 100644
index 0000000..d81e0f4
--- /dev/null
+++ b/service-launcher/src/main/resources/application.yml
@@ -0,0 +1,7 @@
+spring:
+ application:
+ name: service-launcher
+ profiles:
+ include: random-port,common,discovery,metrics,forest
+forest:
+ backend: httpclient
\ No newline at end of file
diff --git a/service-launcher/src/main/resources/logback-spring.xml b/service-launcher/src/main/resources/logback-spring.xml
new file mode 100644
index 0000000..53f79f0
--- /dev/null
+++ b/service-launcher/src/main/resources/logback-spring.xml
@@ -0,0 +1,53 @@
+
+
+
+
+
+
+
+
+
+
+
+ true
+
+ ${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push}
+
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx
+
+ true
+
+
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %clr(%5p) %clr([${HOSTNAME}]){yellow} %clr([%t]){magenta} %clr(%logger{40}){cyan} #@# %m%n%wEx
+
+
+
+
+ ${LOGGING_PARENT:-.}/${APP_NAME:-run}.log
+
+ ${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz
+ 7
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/service-launcher/src/main/resources/services/org.apache.flink.client.deployment.ClusterClientFactory b/service-launcher/src/main/resources/services/org.apache.flink.client.deployment.ClusterClientFactory
new file mode 100644
index 0000000..ea1c4de
--- /dev/null
+++ b/service-launcher/src/main/resources/services/org.apache.flink.client.deployment.ClusterClientFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.YarnClusterClientFactory
\ No newline at end of file
diff --git a/service-launcher/src/main/resources/services/org.apache.flink.core.execution.PipelineExecutorFactory b/service-launcher/src/main/resources/services/org.apache.flink.core.execution.PipelineExecutorFactory
new file mode 100644
index 0000000..d56f8c5
--- /dev/null
+++ b/service-launcher/src/main/resources/services/org.apache.flink.core.execution.PipelineExecutorFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
+org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
\ No newline at end of file
diff --git a/service-launcher/src/main/resources/services/org.apache.flink.metrics.reporter.MetricReporterFactory b/service-launcher/src/main/resources/services/org.apache.flink.metrics.reporter.MetricReporterFactory
new file mode 100644
index 0000000..e16bdb6
--- /dev/null
+++ b/service-launcher/src/main/resources/services/org.apache.flink.metrics.reporter.MetricReporterFactory
@@ -0,0 +1 @@
+com.eshore.odcp.hudi.connector.utils.executor.metrics.VictoriaMetricsReporterFactory
\ No newline at end of file
diff --git a/service-launcher/src/main/resources/services/org.apache.flink.table.factories.Factory b/service-launcher/src/main/resources/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..ac5f9b6
--- /dev/null
+++ b/service-launcher/src/main/resources/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.planner.delegation.DefaultExecutorFactory
+org.apache.flink.table.planner.delegation.DefaultParserFactory
+org.apache.flink.table.planner.delegation.DefaultPlannerFactory