From 474ee6173dc9a9617c71306c96db0236642d933e Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 28 Feb 2024 18:37:12 +0800 Subject: [PATCH] =?UTF-8?q?feat(scheduler):=20=E8=BF=81=E7=A7=BBservice-la?= =?UTF-8?q?uncher=E9=A1=B9=E7=9B=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bin/build-launcher.sh | 20 + pom.xml | 36 +- .../src/main/resources/application-b12.yml | 8 +- .../src/main/resources/application.yml | 24 +- .../launcher/impl/A4LauncherService.java | 2 +- .../launcher/impl/B12LauncherService.java | 2 +- .../launcher/impl/B1LauncherService.java | 2 +- .../launcher/impl/B5LauncherService.java | 2 +- service-launcher/pom.xml | 176 +++++++++ .../service/launcher/ExecutorService.java | 355 ++++++++++++++++++ .../launcher/LauncherRunnerApplication.java | 35 ++ .../controller/CompactionController.java | 110 ++++++ .../compaction/service/CompactionService.java | 274 ++++++++++++++ .../configuration/ClusterConfiguration.java | 61 +++ .../configuration/HadoopConfiguration.java | 51 +++ .../configuration/HudiConfiguration.java | 61 +++ .../configuration/ZookeeperConfiguration.java | 41 ++ .../controller/SynchronizerController.java | 40 ++ .../service/SynchronizerService.java | 137 +++++++ .../service/launcher/utils/HadoopUtil.java | 64 ++++ .../service/launcher/utils/JacksonUtil.java | 30 ++ .../src/main/resources/application.yml | 7 + .../src/main/resources/logback-spring.xml | 53 +++ ...ink.client.deployment.ClusterClientFactory | 16 + ...ink.core.execution.PipelineExecutorFactory | 17 + ...ink.metrics.reporter.MetricReporterFactory | 1 + .../org.apache.flink.table.factories.Factory | 18 + 27 files changed, 1599 insertions(+), 44 deletions(-) create mode 100755 bin/build-launcher.sh create mode 100644 service-launcher/pom.xml create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/LauncherRunnerApplication.java create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/controller/CompactionController.java create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ClusterConfiguration.java create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HadoopConfiguration.java create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HudiConfiguration.java create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ZookeeperConfiguration.java create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/controller/SynchronizerController.java create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/service/SynchronizerService.java create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/HadoopUtil.java create mode 100644 service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/JacksonUtil.java create mode 100644 service-launcher/src/main/resources/application.yml create mode 100644 service-launcher/src/main/resources/logback-spring.xml create mode 100644 service-launcher/src/main/resources/services/org.apache.flink.client.deployment.ClusterClientFactory create mode 100644 service-launcher/src/main/resources/services/org.apache.flink.core.execution.PipelineExecutorFactory create mode 100644 service-launcher/src/main/resources/services/org.apache.flink.metrics.reporter.MetricReporterFactory create mode 100644 service-launcher/src/main/resources/services/org.apache.flink.table.factories.Factory diff --git a/bin/build-launcher.sh b/bin/build-launcher.sh new file mode 100755 index 0000000..0f7fd92 --- /dev/null +++ b/bin/build-launcher.sh @@ -0,0 +1,20 @@ +#!/bin/bash +echo 'Build b2a4' +mvn -pl service-launcher clean package -D skipTests -s ~/.m2/settings-development.xml -P b2a4 +ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2a4-1.0.0-SNAPSHOT.jar +rm /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2a4-1.0.0-SNAPSHOT.jar + +echo 'Build b2b1' +mvn -pl service-launcher clean package -D skipTests -s ~/.m2/settings-development.xml -P b2b1 +ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b1-1.0.0-SNAPSHOT.jar +rm /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b1-1.0.0-SNAPSHOT.jar + +echo 'Build b2b5' +mvn -pl service-launcher clean package -D skipTests -s ~/.m2/settings-development.xml -P b2b5 +ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b5-1.0.0-SNAPSHOT.jar +rm /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b5-1.0.0-SNAPSHOT.jar + +echo 'Build b2b12' +mvn -pl service-launcher clean package -D skipTests -s ~/.m2/settings-development.xml -P b2b12 +ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b12-1.0.0-SNAPSHOT.jar +rm /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-launcher/target/service-launcher-b2b12-1.0.0-SNAPSHOT.jar diff --git a/pom.xml b/pom.xml index 64a28c3..48e000f 100644 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,7 @@ service-check service-api service-scheduler + service-launcher @@ -50,33 +51,21 @@ - b1e1 + b2a4 - b1e1 + b2a4 - b1e11 + b2b1 - b1e11 + b2b1 - b2e1 + b2b5 - b2e1 - - - - b5s119 - - b5s119 - - - - b2s119 - - b2s119 + b2b5 @@ -85,12 +74,6 @@ b2b12 - - b9b9 - - b9b9 - - @@ -181,6 +164,11 @@ maven-source-plugin 3.3.0 + + org.apache.maven.plugins + maven-shade-plugin + 3.3.0 + org.springframework.boot spring-boot-maven-plugin diff --git a/service-cli/service-cli-runner/src/main/resources/application-b12.yml b/service-cli/service-cli-runner/src/main/resources/application-b12.yml index c0d4cca..9e35808 100644 --- a/service-cli/service-cli-runner/src/main/resources/application-b12.yml +++ b/service-cli/service-cli-runner/src/main/resources/application-b12.yml @@ -32,13 +32,13 @@ deploy: services: service-api: replicas: 10 - service-launcher-runner-b1: + service-launcher-b1: replicas: 6 - service-launcher-runner-b5: + service-launcher-b5: replicas: 6 - service-launcher-runner-a4: + service-launcher-a4: replicas: 6 - service-launcher-runner-b12: + service-launcher-b12: replicas: 6 service-info-query: replicas: 10 diff --git a/service-cli/service-cli-runner/src/main/resources/application.yml b/service-cli/service-cli-runner/src/main/resources/application.yml index 92c7f66..2768a08 100644 --- a/service-cli/service-cli-runner/src/main/resources/application.yml +++ b/service-cli/service-cli-runner/src/main/resources/application.yml @@ -20,13 +20,13 @@ deploy: - "service-hudi" source-jar: service-scheduler-1.0.0-SNAPSHOT.jar replicas: 1 - service-launcher-runner-b1: + service-launcher-b1: order: 4 groups: - "service" - "service-hudi" - "service-hudi-launcher" - source-jar: service-launcher-runner-b2b1-1.0.0-SNAPSHOT.jar + source-jar: service-launcher-b2b1-1.0.0-SNAPSHOT.jar replicas: 6 environments: connector_hadoop_kerberos-principal: ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM @@ -35,18 +35,18 @@ deploy: connector_hudi_victoria-push-url: ${deploy.runtime.hudi.victoria-push-url} connector_hudi_loki-push-url: ${deploy.runtime.hudi.loki-push-url} arguments: - spring_application_name: service-launcher-runner-b1 + spring_application_name: service-launcher-b1 connector_cluster_cluster: b1 connector_cluster_sync-queue-name: sync-queue-b1 connector_cluster_compaction-queue-name: compaction-queue-b1 connector_zookeeper_connect-url: ${deploy.runtime.connector-zk-url} - service-launcher-runner-b5: + service-launcher-b5: order: 4 groups: - "service" - "service-hudi" - "service-hudi-launcher" - source-jar: service-launcher-runner-b2b5-1.0.0-SNAPSHOT.jar + source-jar: service-launcher-b2b5-1.0.0-SNAPSHOT.jar replicas: 6 environments: connector_hadoop_kerberos-principal: ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM @@ -55,18 +55,18 @@ deploy: connector_hudi_victoria-push-url: ${deploy.runtime.hudi.victoria-push-url} connector_hudi_loki-push-url: ${deploy.runtime.hudi.loki-push-url} arguments: - spring_application_name: service-launcher-runner-b5 + spring_application_name: service-launcher-b5 connector_cluster_cluster: b5 connector_cluster_sync-queue-name: sync-queue-b5 connector_cluster_compaction-queue-name: compaction-queue-b5 connector_zookeeper_connect-url: ${deploy.runtime.connector-zk-url} - service-launcher-runner-a4: + service-launcher-a4: order: 4 groups: - "service" - "service-hudi" - "service-hudi-launcher" - source-jar: service-launcher-runner-b2a4-1.0.0-SNAPSHOT.jar + source-jar: service-launcher-b2a4-1.0.0-SNAPSHOT.jar replicas: 6 environments: connector_hadoop_kerberos-principal: ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM @@ -75,18 +75,18 @@ deploy: connector_hudi_victoria-push-url: ${deploy.runtime.hudi.victoria-push-url} connector_hudi_loki-push-url: ${deploy.runtime.hudi.loki-push-url} arguments: - spring_application_name: service-launcher-runner-a4 + spring_application_name: service-launcher-a4 connector_cluster_cluster: a4 connector_cluster_sync-queue-name: sync-queue-a4 connector_cluster_compaction-queue-name: compaction-queue-a4 connector_zookeeper_connect-url: ${deploy.runtime.connector-zk-url} - service-launcher-runner-b12: + service-launcher-b12: order: 4 groups: - "service" - "service-hudi" - "service-hudi-launcher" - source-jar: service-launcher-runner-b2b12-1.0.0-SNAPSHOT.jar + source-jar: service-launcher-b2b12-1.0.0-SNAPSHOT.jar replicas: 6 environments: connector_hadoop_kerberos-principal: ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM @@ -95,7 +95,7 @@ deploy: connector_hudi_victoria-push-url: ${deploy.runtime.hudi.victoria-push-url} connector_hudi_loki-push-url: ${deploy.runtime.hudi.loki-push-url} arguments: - spring_application_name: service-launcher-runner-b12 + spring_application_name: service-launcher-b12 connector_cluster_cluster: b12 connector_cluster_sync-queue-name: sync-queue-b12 connector_cluster_compaction-queue-name: compaction-queue-b12 diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/A4LauncherService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/A4LauncherService.java index 55fe4e6..f36e862 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/A4LauncherService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/A4LauncherService.java @@ -7,6 +7,6 @@ import com.lanyuanxiaoyao.service.forest.service.launcher.LauncherService; * @author lanyuanxiaoyao * @date 2023-06-06 */ -@BaseRequest(baseURL = "http://service-launcher-runner-a4") +@BaseRequest(baseURL = "http://service-launcher-a4") public interface A4LauncherService extends LauncherService { } diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B12LauncherService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B12LauncherService.java index b4c3fb1..1009ab0 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B12LauncherService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B12LauncherService.java @@ -7,6 +7,6 @@ import com.lanyuanxiaoyao.service.forest.service.launcher.LauncherService; * @author lanyuanxiaoyao * @date 2023-06-06 */ -@BaseRequest(baseURL = "http://service-launcher-runner-b12") +@BaseRequest(baseURL = "http://service-launcher-b12") public interface B12LauncherService extends LauncherService { } diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B1LauncherService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B1LauncherService.java index a674a63..2659d45 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B1LauncherService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B1LauncherService.java @@ -9,6 +9,6 @@ import com.lanyuanxiaoyao.service.forest.service.launcher.LauncherService; * @author lanyuanxiaoyao * @date 2023-06-06 */ -@BaseRequest(baseURL = "http://service-launcher-runner-b1") +@BaseRequest(baseURL = "http://service-launcher-b1") public interface B1LauncherService extends LauncherService { } diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B5LauncherService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B5LauncherService.java index a14d558..a61c76f 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B5LauncherService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/launcher/impl/B5LauncherService.java @@ -7,6 +7,6 @@ import com.lanyuanxiaoyao.service.forest.service.launcher.LauncherService; * @author lanyuanxiaoyao * @date 2023-06-06 */ -@BaseRequest(baseURL = "http://service-launcher-runner-b5") +@BaseRequest(baseURL = "http://service-launcher-b5") public interface B5LauncherService extends LauncherService { } diff --git a/service-launcher/pom.xml b/service-launcher/pom.xml new file mode 100644 index 0000000..08ff946 --- /dev/null +++ b/service-launcher/pom.xml @@ -0,0 +1,176 @@ + + + 4.0.0 + + com.lanyuanxiaoyao + hudi-service + 1.0.0-SNAPSHOT + + + service-launcher + + + + com.lanyuanxiaoyao + service-configuration + 1.0.0-SNAPSHOT + + + org.apache.logging.log4j + log4j-to-slf4j + + + + + org.apache.curator + curator-recipes + 5.1.0 + + + org.apache.hudi + hudi-flink${flink.major.version}-bundle + ${hudi.version} + + + com.lanyuanxiaoyao + service-forest + 1.0.0-SNAPSHOT + + + + org.springframework.boot + spring-boot-starter-logging + + + org.apache.logging.log4j + log4j-to-slf4j + + + + + org.springframework.boot + spring-boot-starter-tomcat + + + com.eshore.odcp.hudi.connector + executor + 1.0.0-SNAPSHOT + + + com.sun.jersey + jersey-client + 1.19.4 + + + com.sun.jersey + jersey-core + 1.19.4 + + + com.sun.jersey.contribs + jersey-apache-client4 + 1.19.4 + + + org.apache.pulsar + pulsar-client + runtime + + + + + ${artifactId}-${build-tag}-${version} + + + org.apache.maven.plugins + maven-resources-plugin + 3.2.0 + + + copy-config-file + validate + + copy-resources + + + ${project.build.directory}/classes + + + ${project.parent.basedir}/config/${build-tag} + + *.xml + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + false + true + + + META-INF/spring.handlers + + + META-INF/spring.factories + + + META-INF/spring.schemas + + + + com.lanyuanxiaoyao.service.launcher.LauncherRunnerApplication + + + + reference.conf + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + core-default.xml + hdfs-default.xml + yarn-default.xml + log4j-surefire*.properties + + + + + + + package + + shade + + + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + + + + \ No newline at end of file diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java new file mode 100644 index 0000000..9e457bc --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java @@ -0,0 +1,355 @@ +package com.lanyuanxiaoyao.service.launcher; + +import cn.hutool.core.lang.Tuple; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.ReUtil; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.Constants; +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.eshore.odcp.hudi.connector.exception.CheckpointRootPathNotFoundException; +import com.eshore.odcp.hudi.connector.utils.NameHelper; +import com.eshore.odcp.hudi.connector.utils.executor.Runner; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration; +import com.lanyuanxiaoyao.service.launcher.configuration.HudiConfiguration; +import com.lanyuanxiaoyao.service.launcher.utils.HadoopUtil; +import com.lanyuanxiaoyao.service.launcher.utils.JacksonUtil; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.regex.Pattern; +import javax.annotation.Resource; +import org.apache.flink.client.cli.ClientOptions; +import org.apache.flink.configuration.*; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.configuration.YarnDeploymentTarget; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hudi.common.util.collection.Pair; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import static com.eshore.odcp.hudi.connector.Constants.HALF_HOUR; +import static com.eshore.odcp.hudi.connector.Constants.MINUTE; + +/** + * 执行器 + * + * @author ZhangJiacheng + * @date 2022-06-06 + */ +@Service +public class ExecutorService { + private static final Logger logger = LoggerFactory.getLogger(ExecutorService.class); + private static final ObjectMapper MAPPER = JacksonUtil.getMapper(); + private static final Pattern EXECUTOR_JAR_NAME = Pattern.compile(".+sync-(\\d+)\\.jar"); + @Resource + private InfoService infoService; + @Resource + private HadoopConfiguration hadoopConfiguration; + @Resource + private HudiConfiguration hudiConfiguration; + + public static String[] syncArgs(FlinkJob flinkJob, ImmutableList tableMetaList, String cluster) throws JsonProcessingException { + List argsList = Lists.mutable.empty(); + argsList.add(Constants.FLINK_JOB_OPTION); + argsList.add(MAPPER.writeValueAsString(flinkJob)); + argsList.add(Constants.TABLE_META_LIST_OPTION); + argsList.add(MAPPER.writeValueAsString(tableMetaList)); + argsList.add(Constants.CLUSTER_OPTION); + argsList.add(cluster); + return argsList.toArray(new String[]{}); + } + + public static String[] compactionArgs(FlinkJob flinkJob, TableMeta tableMeta, String instants, String cluster) throws JsonProcessingException { + List argsList = Lists.mutable.empty(); + argsList.add(Constants.FLINK_JOB_OPTION); + argsList.add(MAPPER.writeValueAsString(flinkJob)); + argsList.add(Constants.TABLE_META_OPTION); + argsList.add(MAPPER.writeValueAsString(tableMeta)); + if (ObjectUtil.isNotEmpty(instants)) { + argsList.add(Constants.INSTANTS_OPTION); + argsList.add(instants); + } + argsList.add(Constants.CLUSTER_OPTION); + argsList.add(cluster); + return argsList.toArray(new String[]{}); + } + + private Memory jobMemory(FlinkJob flinkJob, ImmutableList tableMetas) { + int jobMemory = 1024; + int jobMetaspaceMemory = 256; + int taskMemory = 512; + int taskMetaspaceMemory = 256; + + switch (flinkJob.getRunMode()) { + case ALL_IN_ONE: + jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory); + jobMetaspaceMemory = tableMetas.size() * 25; + taskMemory = (int) tableMetas.sumOfInt(meta -> meta.getSyncYarn().getTaskManagerMemory()); + break; + case ALL_IN_ONE_BY_SCHEMA: + jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory); + jobMetaspaceMemory = tableMetas.distinctBy(TableMeta::getSchema).size() * 25; + taskMemory = (int) tableMetas.sumByInt(TableMeta::getSchema, meta -> meta.getSyncYarn().getTaskManagerMemory()).max(); + break; + case ALL_IN_ONE_BY_TABLE: + jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory); + jobMetaspaceMemory = tableMetas.distinctBy(TableMeta::getTable).size() * 25; + taskMemory = (int) tableMetas.sumByInt(TableMeta::getTable, meta -> meta.getSyncYarn().getTaskManagerMemory()).max(); + break; + case ONE_IN_ONE: + // jobMemory = tableMetas.collectInt(meta -> meta.getSyncYarn().getJobManagerMemory()).maxIfEmpty(jobMemory); + jobMemory = flinkJob.getOneInOneSyncYarn().getJobManagerMemory(); + jobMetaspaceMemory = tableMetas.size() * 25; + taskMemory = flinkJob.getOneInOneSyncYarn().getTaskManagerMemory(); + break; + default: + } + + taskMetaspaceMemory = Math.min(taskMemory / 3, tableMetas.size() * 25); + + return new Memory( + Math.max(512, jobMemory), + Math.max(256, jobMetaspaceMemory), + Math.max(512, taskMemory), + Math.max(256, taskMetaspaceMemory) + ); + } + + private String getLatestExecutorJarPath() throws IOException { + try (FileSystem fileSystem = HadoopUtil.createFileSystem(HadoopUtil.createConfiguration(hadoopConfiguration))) { + Path root = new Path(hudiConfiguration.getAppHdfsPath()); + return Lists.immutable.of(fileSystem.listStatus(root)) + .select(FileStatus::isFile) + .collect(FileStatus::getPath) + .collect(Path::toString) + .select(path -> ReUtil.isMatch(EXECUTOR_JAR_NAME, path)) + .collect(path -> new Tuple(path, getLatestExecutorJarVersion(path))) + .reject(tuple -> tuple.get(1) < 0) + .maxByOptional(tuple -> tuple.get(1)) + .orElseThrow(() -> new RuntimeException("Latest jar path not found")) + .get(0); + } + } + + private Long getLatestExecutorJarVersion(String path) { + return Optional.ofNullable(ReUtil.get(EXECUTOR_JAR_NAME, path, 1)).map(Long::valueOf).orElse(-1L); + } + + private void setEnvironment(Configuration configuration, String key, String value) { + configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + key, value); + configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + key, value); + } + + private Configuration commonConfiguration(String keytabPath, String principal) { + Configuration configuration = new Configuration(); + + configuration.setBoolean(JobManagerOptions.JVM_DIRECT_MEMORY_LIMIT_ENABLED, true); + // configuration.setString(JobManagerOptions.ARCHIVE_DIR, hudiConfiguration.getArchiveHdfsPath()); + + configuration.setString(AkkaOptions.ASK_TIMEOUT, "10 min"); + configuration.setString(AkkaOptions.TCP_TIMEOUT, "15 min"); + configuration.setString(AkkaOptions.LOOKUP_TIMEOUT, "10 min"); + configuration.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(30)); + + // Kerberos认证 + configuration.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, true); + if (StrUtil.isNotBlank(keytabPath)) { + configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath); + } + if (StrUtil.isNotBlank(principal)) { + configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, principal); + } + + // configuration.setString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS, "0.0.0.0"); + // configuration.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 8083); + + configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, MINUTE); + configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, HALF_HOUR); + + configuration.setString(AkkaOptions.ASK_TIMEOUT, "1 min"); + configuration.setString(AkkaOptions.TCP_TIMEOUT, "2 min"); + + configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); + + configuration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "4"); + // configuration.setString(YarnConfigOptions.STAGING_DIRECTORY, "hdfs://b2/apps/datalake/yarn"); + + configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1"); + configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1"); + + configuration.setString(RestOptions.BIND_PORT, "8084-9400"); + + configuration.setString(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName()); + + if (StrUtil.isNotBlank(hudiConfiguration.getVictoriaPushUrl())) { + configuration.setString("metrics.reporter.victoria.class", "com.eshore.odcp.hudi.connector.utils.executor.metrics.VictoriaMetricsReporter"); + configuration.setString("metrics.reporter.victoria.endpoint", hudiConfiguration.getVictoriaPushUrl()); + configuration.setBoolean("metrics.reporter.victoria.enable.auth", true); + configuration.setString("metrics.reporter.victoria.auth.username", "EsCFVuNkiDWv7PKmcF"); + configuration.setString("metrics.reporter.victoria.auth.password", "Abf%x9ocS^iKr3tgrd"); + } + + // 设置 JVM 参数 + // configuration.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, "-XX:CMSInitiatingOccupancyFraction=40"); + // configuration.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, "-XX:+UseG1GC -XX:MaxGCPauseMillis=1500 -XX:InitiatingHeapOccupancyPercent=30 -XX:SurvivorRatio=4 -XX:ParallelGCThreads=60"); + + return configuration; + } + + public ApplicationId runSync(Long flinkJobId, String keytabPath, String principal, String cluster) throws Exception { + FlinkJob flinkJob = infoService.flinkJobDetail(flinkJobId); + ImmutableList tableMetas = infoService.tableMetaList(flinkJobId); + if (ObjectUtil.isEmpty(tableMetas)) { + throw new RuntimeException("Table metas not found"); + } + return runSync(flinkJob, tableMetas, keytabPath, principal, cluster); + } + + public ApplicationId runSync(FlinkJob flinkJob, ImmutableList tableMetaList, String keytabPath, String principal, String cluster) throws Exception { + logger.info("Running Sync: {} {} {}", flinkJob.getId(), keytabPath, principal); + + try (FileSystem fileSystem = HadoopUtil.createFileSystem(HadoopUtil.createConfiguration(hadoopConfiguration))) { + String checkpointRootPath = tableMetaList.collect(meta -> meta.getConfig().getCheckpointRootPath()) + .distinct() + .getFirstOptional() + .orElseThrow(CheckpointRootPathNotFoundException::new); + String checkpointPath = checkpointRootPath + "/" + flinkJob.getId(); + logger.info("Delete checkpoint path {}", checkpointPath); + fileSystem.delete(new Path(checkpointPath), true); + } + + Configuration configuration = commonConfiguration(keytabPath, principal); + + Memory memory = jobMemory(flinkJob, tableMetaList); + // JobManager的内存 + MemorySize jobMemorySize = MemorySize.parse(memory.jobMemory + "m"); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, jobMemorySize); + configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.parse(memory.jobMetaspaceMemory + "m")); + + // TaskManager的内存 + MemorySize taskMemorySize = MemorySize.parse(memory.taskMemory + "m"); + configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, taskMemorySize); + configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("0m")); + configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.parse(memory.taskMetaspaceMemory + "m")); + + // 并行度 + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10); + + configuration.setString(YarnConfigOptions.APPLICATION_NAME, NameHelper.syncJobName(flinkJob.getId(), flinkJob.getName())); + + // configuration.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, "hdfs://b2/apps/flink/completed-jobs/"); + // configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000); + + // 业务jar包 + String executorJarPath = getLatestExecutorJarPath(); + logger.info("Executor jar path: {}", executorJarPath); + Long executorJarVersion = getLatestExecutorJarVersion(executorJarPath); + configuration.set(PipelineOptions.JARS, new ArrayList() {{ + add(executorJarPath); + }}); + + String flinkJobName = flinkJob.getName().replaceAll("\\s", "_"); + + setEnvironment(configuration, Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC); + setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_ID, String.valueOf(flinkJob.getId())); + setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJobName); + setEnvironment(configuration, Constants.METRICS_LABEL_EXECUTOR_VERSION, String.valueOf(executorJarVersion)); + setEnvironment(configuration, Constants.METRICS_LABEL_CLUSTER, cluster); + + setEnvironment(configuration, Constants.LOKI_PUSH_URL, hudiConfiguration.getLokiPushUrl()); + + configuration.setString("metrics.reporter.victoria.tags", Lists.immutable.of( + Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC), + Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()), + Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJobName), + Pair.of(Constants.METRICS_LABEL_EXECUTOR_VERSION, executorJarVersion) + ).collect(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).makeString(";")); + + return Runner.run( + configuration, + "com.eshore.odcp.hudi.connector.sync.Synchronizer", + syncArgs(flinkJob, tableMetaList, cluster) + ); + } + + public ApplicationId runCompaction(String batchCode, FlinkJob flinkJob, TableMeta tableMeta, String keytabPath, String principal, String instants, String cluster) throws Exception { + logger.info("Running Compaction: {} {} {} {} {}", batchCode, flinkJob.getId(), tableMeta.getAlias(), keytabPath, principal); + + Configuration configuration = commonConfiguration(keytabPath, principal); + + MemorySize jobMemorySize = MemorySize.parse(tableMeta.getCompactionYarn().getJobManagerMemory() + "m"); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, jobMemorySize); + + MemorySize taskMemorySize = MemorySize.parse(tableMeta.getCompactionYarn().getTaskManagerMemory() + "m"); + configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, taskMemorySize); + configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("0m")); + + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10); + + configuration.setString(YarnConfigOptions.APPLICATION_NAME, NameHelper.compactionJobName(flinkJob.getId(), tableMeta.getAlias())); + + String executorJarPath = getLatestExecutorJarPath(); + logger.info("Executor jar path: {}", executorJarPath); + Long executorJarVersion = getLatestExecutorJarVersion(executorJarPath); + configuration.set(PipelineOptions.JARS, new ArrayList() {{ + add(executorJarPath); + }}); + + String flinkJobName = flinkJob.getName().replaceAll("\\s", "_"); + + setEnvironment(configuration, Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION); + setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_ID, String.valueOf(flinkJob.getId())); + setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJobName); + setEnvironment(configuration, Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()); + setEnvironment(configuration, Constants.METRICS_LABEL_TABLE, tableMeta.getTable()); + setEnvironment(configuration, Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias()); + setEnvironment(configuration, Constants.METRICS_LABEL_BATCH_ID, batchCode); + setEnvironment(configuration, Constants.METRICS_LABEL_EXECUTOR_VERSION, String.valueOf(executorJarVersion)); + setEnvironment(configuration, Constants.METRICS_LABEL_CLUSTER, cluster); + + setEnvironment(configuration, Constants.LOKI_PUSH_URL, hudiConfiguration.getLokiPushUrl()); + + configuration.setString("metrics.reporter.victoria.tags", Lists.immutable.of( + Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION), + Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()), + Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")), + Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()), + Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()), + Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias()), + Pair.of(Constants.METRICS_LABEL_BATCH_ID, batchCode), + Pair.of(Constants.METRICS_LABEL_EXECUTOR_VERSION, executorJarVersion) + ).collect(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).makeString(";")); + + return Runner.run( + configuration, + "com.eshore.odcp.hudi.connector.sync.Compactor", + compactionArgs(flinkJob, tableMeta, instants, cluster) + ); + } + + private static final class Memory { + private final int jobMemory; + private final int jobMetaspaceMemory; + private final int taskMemory; + private final int taskMetaspaceMemory; + + public Memory(int jobMemory, int jobMetaspaceMemory, int taskMemory, int taskMetaspaceMemory) { + this.jobMemory = jobMemory; + this.jobMetaspaceMemory = jobMetaspaceMemory; + this.taskMemory = taskMemory; + this.taskMetaspaceMemory = taskMetaspaceMemory; + } + } +} diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/LauncherRunnerApplication.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/LauncherRunnerApplication.java new file mode 100644 index 0000000..60a50a1 --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/LauncherRunnerApplication.java @@ -0,0 +1,35 @@ +package com.lanyuanxiaoyao.service.launcher; + +import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.ComponentScans; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * 启动类 + * + * @author ZhangJiacheng + * @date 2023-04-05 + */ +@EnableDiscoveryClient +@SpringBootApplication(exclude = {GsonAutoConfiguration.class}) +@ComponentScans({ + @ComponentScan("com.lanyuanxiaoyao.service"), +}) +@EnableScheduling +@EnableConfigurationProperties +@EnableEncryptableProperties +public class LauncherRunnerApplication { + private static final Logger logger = LoggerFactory.getLogger(LauncherRunnerApplication.class); + + public static void main(String[] args) { + SpringApplication.run(LauncherRunnerApplication.class, args); + } +} diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/controller/CompactionController.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/controller/CompactionController.java new file mode 100644 index 0000000..8e961c5 --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/controller/CompactionController.java @@ -0,0 +1,110 @@ +package com.lanyuanxiaoyao.service.launcher.compaction.controller; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.Constants; +import com.eshore.odcp.hudi.connector.entity.RunMeta; +import com.eshore.odcp.hudi.connector.entity.SyncState; +import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.forest.service.ZookeeperService; +import com.lanyuanxiaoyao.service.launcher.configuration.ClusterConfiguration; +import java.io.IOException; +import java.util.EnumSet; +import javax.annotation.PreDestroy; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * 提供和集群绑定的一些操作 + * + * @author ZhangJiacheng + * @date 2023-05-30 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@RestController +@RequestMapping("launcher/compaction") +public class CompactionController { + private static final Logger logger = LoggerFactory.getLogger(CompactionController.class); + + private final DiscoveryClient discoveryClient; + private final ClusterConfiguration clusterConfiguration; + private final InfoService infoService; + private final ZookeeperService zookeeperService; + private final YarnClient yarnClient; + + public CompactionController(DiscoveryClient discoveryClient, ClusterConfiguration clusterConfiguration, InfoService infoService, ZookeeperService zookeeperService) { + this.discoveryClient = discoveryClient; + this.clusterConfiguration = clusterConfiguration; + this.infoService = infoService; + this.zookeeperService = zookeeperService; + + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration()); + yarnClient.start(); + } + + @PreDestroy + public void destroy() throws IOException { + if (ObjectUtil.isNotNull(yarnClient)) { + yarnClient.stop(); + yarnClient.close(); + } + } + + @GetMapping("stop") + public void stop( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam(value = "disable_meta", defaultValue = "true") Boolean disableMeta + ) { + logger.info("Enter method: stop[flinkJobId, alias]. " + "flinkJobId:" + flinkJobId + "," + "alias:" + alias); + if (disableMeta) { + logger.info("Delete table info"); + infoService.disableTable(flinkJobId, alias); + } + QueueUtil.remove(discoveryClient, clusterConfiguration.getCompactionQueueName(), StrUtil.format("{}-{}", flinkJobId, alias)); + // 再删除一次,避免时间差导致任务重新放回预调度队列 + QueueUtil.remove(discoveryClient, Constants.COMPACTION_QUEUE_PRE, StrUtil.format("{}-{}", flinkJobId, alias)); + logger.info("Remove job form queue"); + try { + RunMeta meta = zookeeperService.getSyncRunMeta(flinkJobId, alias); + if (ObjectUtil.isNotNull(meta) && ObjectUtil.isNotEmpty(meta.getApplicationId())) { + String applicationId = meta.getApplicationId(); + stopApp(applicationId); + logger.info("Found application id and kill: {}", applicationId); + } else { + SyncState syncState = infoService.syncStateDetail(flinkJobId, alias); + if (ObjectUtil.isNotEmpty(syncState.getCompactionApplicationId())) { + String applicationId = syncState.getCompactionApplicationId(); + stopApp(applicationId); + logger.info("Found application id and kill: {}", applicationId); + } + } + } catch (Throwable throwable) { + logger.warn("Application id not found, it may stop yet"); + } + } + + @GetMapping("stop_app") + public void stopApp(@RequestParam("application_id") String applicationId) throws IOException, YarnException { + boolean exists = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING)) + .stream() + .anyMatch(r -> StrUtil.equals(r.getApplicationId().toString(), applicationId)); + if (exists) { + yarnClient.killApplication(ApplicationId.fromString(applicationId)); + } else { + logger.info("Application: {} not running", applicationId); + } + } +} diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java new file mode 100644 index 0000000..781632b --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java @@ -0,0 +1,274 @@ +package com.lanyuanxiaoyao.service.launcher.compaction.service; + +import cn.hutool.core.thread.ThreadUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.Constants; +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.eshore.odcp.hudi.connector.entity.compaction.ScheduleJob; +import com.eshore.odcp.hudi.connector.utils.LogHelper; +import com.eshore.odcp.hudi.connector.utils.NameHelper; +import com.eshore.odcp.hudi.connector.utils.TableMetaHelper; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan; +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; +import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; +import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; +import com.lanyuanxiaoyao.service.forest.service.HudiService; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.launcher.ExecutorService; +import com.lanyuanxiaoyao.service.launcher.configuration.ClusterConfiguration; +import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration; +import com.lanyuanxiaoyao.service.launcher.configuration.ZookeeperConfiguration; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.locks.InterProcessLock; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.data.Stat; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.list.MutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +/** + * @author ZhangJiacheng + * @date 2023-05-08 + */ +@Service +public class CompactionService { + private static final Logger logger = LoggerFactory.getLogger(CompactionService.class); + private static final java.util.concurrent.ExecutorService EXECUTOR = Executors.newWorkStealingPool(20); + private static final RetryPolicy RETRY_POLICY = RetryPolicy.builder() + .handle(Exception.class) + .withDelay(Duration.ofSeconds(1)) + .withMaxAttempts(3) + .build(); + private static final int FILE_PER_PARALLELISM = 2; + private static final int MAX_PARALLELISM = 500; + private static final int MIN_PARALLELISM = 10; + private final HadoopConfiguration hadoopConfiguration; + private final ClusterConfiguration clusterConfiguration; + private final DiscoveryClient discoveryClient; + private final CuratorFramework zookeeperClient; + private final InfoService infoService; + private final HudiService hudiService; + private final ExecutorService executorService; + private final ObjectMapper mapper; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public CompactionService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ZookeeperConfiguration zookeeperConfiguration, DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ExecutorService executorService, Jackson2ObjectMapperBuilder builder) { + this.hadoopConfiguration = hadoopConfiguration; + this.clusterConfiguration = clusterConfiguration; + this.discoveryClient = discoveryClient; + this.infoService = infoService; + this.hudiService = hudiService; + this.executorService = executorService; + mapper = builder.build(); + this.zookeeperClient = CuratorFrameworkFactory.builder() + .connectString(zookeeperConfiguration.getConnectUrl()) + .retryPolicy(new ExponentialBackoffRetry((int) Constants.SECOND, 3)) + .sessionTimeoutMs((int) Constants.SECOND) + .connectionTimeoutMs((int) (30 * Constants.SECOND)) + .build(); + this.zookeeperClient.start(); + } + + @PreDestroy + public void destroy() { + if (ObjectUtil.isNotNull(this.zookeeperClient)) { + this.zookeeperClient.close(); + } + } + + @Scheduled(fixedDelay = 5, initialDelay = 30, timeUnit = TimeUnit.SECONDS) + public void scheduleCompact() { + java.util.concurrent.ExecutorService threadPool = Executors.newWorkStealingPool(5); + if (Boolean.FALSE.equals(QueueUtil.isEmpty(discoveryClient, clusterConfiguration.getCompactionQueueName()))) { + for (int index = 0; index < 5; index++) { + threadPool.submit(this::compact); + } + threadPool.shutdown(); + while (!threadPool.isTerminated()) { + ThreadUtil.safeSleep(10 * Constants.SECOND); + } + logger.info("Finish compaction schedule"); + } + } + + @SuppressWarnings("DataFlowIssue") + private void compact() { + // 每次把由于各种原因无法运行的任务先放在暂存 list 里面,直到遇到一个能执行的任务,就把这批任务送回队列 + MutableList> holder = Lists.mutable.empty(); + + QueueItem item = QueueUtil.poll(discoveryClient, this.mapper, clusterConfiguration.getCompactionQueueName()); + // 检查是否获取到新的任务,如果队列为空,获取到的任务也是空 + while (ObjectUtil.isNotNull(item)) { + LogHelper.setMdc(Constants.LOG_JOB_ID_LABEL, item.getTraceId()); + try { + // 检查任务内容是否存在 + if (ObjectUtil.isNotNull(item.getData())) { + ScheduleJob job = item.getData(); + LogHelper.setMdcFlinkJobAndAlias(job.getFlinkJobId(), job.getAlias()); + logger.info("Receive job[{}]({}): {}", item.getTraceId(), item.getCreateTime(), item.getData()); + // 构造任务相关的锁 + String lockPath = NameHelper.compactionLauncherLockPath(job.getFlinkJobId(), job.getAlias()); + InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath); + try { + if (lock.acquire(2, TimeUnit.SECONDS)) { + Stat stat = zookeeperClient.checkExists().forPath(NameHelper.compactionRunningLockPath(job.getFlinkJobId(), job.getAlias())); + if (ObjectUtil.isNotNull(stat)) { + logger.info("Job {} {} is running", job.getFlinkJobId(), job.getAlias()); + // 运行中的任务放在持有容器中 + holder.add(item); + // 进入下一轮,由于最外层有一个 finally,所以直接 continue 也会尝试获取新的任务 + continue; + } + FlinkJob flinkJob = infoService.flinkJobDetail(job.getFlinkJobId()); + TableMeta meta = infoService.tableMetaDetail(job.getFlinkJobId(), job.getAlias()); + if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) { + logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias()); + clearHolder(holder); + continue; + } + logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias()); + // 判断是否存在 Hudi 表,提前结束掉 + if (!hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias())) { + logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias()); + clearHolder(holder); + continue; + } + // 获取待压缩的时间点 + ImmutableList selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias()); + if (ObjectUtil.isEmpty(selectedInstants)) { + logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias()); + clearHolder(holder); + continue; + } + logger.info("[{}] [{}] Selected Instants: {}", flinkJob.getId(), meta.getAlias(), selectedInstants.makeString(",")); + // 计算待压缩的文件数 + long count = predictCompactFileCount(meta, selectedInstants); + if (ObjectUtil.isNotNull(count)) { + // 根据待压缩的文件数计算并行度 + long parallelism = predictParallelism(count); + logger.info("[{}] [{}] Predict compact files: {} {}", flinkJob.getId(), meta.getAlias(), count, parallelism); + meta.getHudi().setCompactionTasks((int) parallelism); + } + logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias()); + String applicationId = Failsafe.with(RETRY_POLICY) + .get(() -> executorService.runCompaction( + job.getBatch(), + flinkJob, + meta, + hadoopConfiguration.getKerberosKeytabPath(), + hadoopConfiguration.getKerberosPrincipal(), + selectedInstants.collect(HudiInstant::getTimestamp).makeString(","), + clusterConfiguration.getCluster() + ).toString()); + Failsafe.with(RETRY_POLICY) + .run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId)); + clearHolder(holder); + } else { + logger.warn("Un acquire lock for " + item.getId()); + holder.add(item); + } + } catch (Exception e) { + logger.warn(StrUtil.format("[{}] [{}] Try lock something wrong ", job.getFlinkJobId(), job.getAlias()), e); + String failCount = item.getMetadata(Constants.SCHEDULE_JOB_FAIL_COUNT); + if (StrUtil.isNotBlank(failCount)) { + int fail = Integer.parseInt(failCount); + if (fail > 5) { + logger.error("Job {} cause unaccepted error", item); + continue; + } else { + item.getMetadata().put(Constants.SCHEDULE_JOB_FAIL_COUNT, String.valueOf(fail + 1)); + } + } else { + item.getMetadata().put(Constants.SCHEDULE_JOB_FAIL_COUNT, "1"); + } + QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item); + } finally { + // 无论如何,尝试解锁 + try { + if (lock.isAcquiredInThisProcess()) { + lock.release(); + } + } catch (Exception e) { + logger.error("Release lock failure " + lockPath, e); + } + } + } else { + logger.warn("Schedule job is empty. [{}]({}): {}", item.getTraceId(), item.getCreateTime(), item); + } + } finally { + // 无论如何尝试获取下个任务 + item = QueueUtil.poll(discoveryClient, this.mapper, clusterConfiguration.getCompactionQueueName()); + LogHelper.removeMdc(Constants.LOG_JOB_ID_LABEL, Constants.LOG_FLINK_JOB_ID_LABEL, Constants.LOG_ALIAS_LABEL); + } + } + clearHolder(holder); + } + + private void clearHolder(MutableList> holder) { + if (holder.isEmpty()) { + return; + } + // 等待半分钟 + ThreadUtil.safeSleep(Constants.HALF_MINUTE); + for (QueueItem item : holder) { + QueueUtil.add(discoveryClient, this.mapper, Constants.COMPACTION_QUEUE_PRE, item); + } + holder.clear(); + } + + private Long predictCompactFileCount(TableMeta meta, ImmutableList selectedInstants) { + long count = selectedInstants.asParallel(EXECUTOR, 1) + .sumOfLong(instant -> { + try { + HudiCompactionPlan plan = hudiService.readCompactionPlan(meta.getJob().getId(), meta.getAlias(), instant.getTimestamp()); + return plan.getOperations().size(); + } catch (Exception e) { + logger.error("Read compaction plan failure", e); + } + return 0; + }); + return count < 1 ? meta.getHudi().getBucketIndexNumber() : count; + } + + private Integer predictParallelism(Long count) { + long parallelism = Math.round(count * 1.0 / FILE_PER_PARALLELISM); + if (parallelism > MAX_PARALLELISM) { + parallelism = MAX_PARALLELISM; + } + if (parallelism < MIN_PARALLELISM) { + parallelism = MIN_PARALLELISM; + } + return Math.toIntExact(parallelism); + } + + private QueueItem deserialize(String body) { + if (StrUtil.isBlank(body)) { + return null; + } + try { + return mapper.readValue(body, new TypeReference>() { + }); + } catch (Throwable error) { + logger.error("Schedule job parse error", error); + return null; + } + } +} diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ClusterConfiguration.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ClusterConfiguration.java new file mode 100644 index 0000000..4141132 --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ClusterConfiguration.java @@ -0,0 +1,61 @@ +package com.lanyuanxiaoyao.service.launcher.configuration; + +import javax.annotation.PostConstruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * 集群相关配置 + * + * @author ZhangJiacheng + * @date 2023-05-09 + */ +@ConfigurationProperties("connector.cluster") +@Component +public class ClusterConfiguration { + private static final Logger logger = LoggerFactory.getLogger(ClusterConfiguration.class); + + private String cluster; + private String syncQueueName; + private String compactionQueueName; + + @PostConstruct + private void init() { + logger.info("Configuration initial: {}", this); + } + + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public String getSyncQueueName() { + return syncQueueName; + } + + public void setSyncQueueName(String syncQueueName) { + this.syncQueueName = syncQueueName; + } + + public String getCompactionQueueName() { + return compactionQueueName; + } + + public void setCompactionQueueName(String compactionQueueName) { + this.compactionQueueName = compactionQueueName; + } + + @Override + public String toString() { + return "ClusterConfiguration{" + + "cluster='" + cluster + '\'' + + ", syncQueueName='" + syncQueueName + '\'' + + ", compactionQueueName='" + compactionQueueName + '\'' + + '}'; + } +} diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HadoopConfiguration.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HadoopConfiguration.java new file mode 100644 index 0000000..5645589 --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HadoopConfiguration.java @@ -0,0 +1,51 @@ +package com.lanyuanxiaoyao.service.launcher.configuration; + +import javax.annotation.PostConstruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * yarn 配置 + * + * @author ZhangJiacheng + * @date 2022-03-30 + */ +@ConfigurationProperties("connector.hadoop") +@Component +public class HadoopConfiguration { + private static final Logger logger = LoggerFactory.getLogger(HadoopConfiguration.class); + + private String kerberosPrincipal; + private String kerberosKeytabPath; + + @PostConstruct + private void init() { + logger.info("Configuration initial: {}", this); + } + + public String getKerberosPrincipal() { + return kerberosPrincipal; + } + + public void setKerberosPrincipal(String kerberosPrincipal) { + this.kerberosPrincipal = kerberosPrincipal; + } + + public String getKerberosKeytabPath() { + return kerberosKeytabPath; + } + + public void setKerberosKeytabPath(String kerberosKeytabPath) { + this.kerberosKeytabPath = kerberosKeytabPath; + } + + @Override + public String toString() { + return "HadoopConfiguration{" + + "kerberosPrincipal='" + kerberosPrincipal + '\'' + + ", kerberosKeytabPath='" + kerberosKeytabPath + '\'' + + '}'; + } +} diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HudiConfiguration.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HudiConfiguration.java new file mode 100644 index 0000000..c76495a --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HudiConfiguration.java @@ -0,0 +1,61 @@ +package com.lanyuanxiaoyao.service.launcher.configuration; + +import javax.annotation.PostConstruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * Hudi 配置 + * + * @author ZhangJiacheng + * @date 2023-02-03 + */ +@ConfigurationProperties("connector.hudi") +@Component +public class HudiConfiguration { + private static final Logger logger = LoggerFactory.getLogger(HudiConfiguration.class); + + private String appHdfsPath; + private String victoriaPushUrl; + private String lokiPushUrl; + + @PostConstruct + private void init() { + logger.info("Configuration initial: {}", this); + } + + public String getAppHdfsPath() { + return appHdfsPath; + } + + public void setAppHdfsPath(String appHdfsPath) { + this.appHdfsPath = appHdfsPath; + } + + public String getVictoriaPushUrl() { + return victoriaPushUrl; + } + + public void setVictoriaPushUrl(String victoriaPushUrl) { + this.victoriaPushUrl = victoriaPushUrl; + } + + public String getLokiPushUrl() { + return lokiPushUrl; + } + + public void setLokiPushUrl(String lokiPushUrl) { + this.lokiPushUrl = lokiPushUrl; + } + + @Override + public String toString() { + return "HudiConfiguration{" + + "appHdfsPath='" + appHdfsPath + '\'' + + ", victoriaPushUrl='" + victoriaPushUrl + '\'' + + ", lokiPushUrl='" + lokiPushUrl + '\'' + + '}'; + } +} diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ZookeeperConfiguration.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ZookeeperConfiguration.java new file mode 100644 index 0000000..505a6dd --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/ZookeeperConfiguration.java @@ -0,0 +1,41 @@ +package com.lanyuanxiaoyao.service.launcher.configuration; + +import javax.annotation.PostConstruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * Zookeeper 配置 + * + * @author ZhangJiacheng + * @date 2023-05-04 + */ +@ConfigurationProperties("connector.zookeeper") +@Component +public class ZookeeperConfiguration { + private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfiguration.class); + + private String connectUrl; + + @PostConstruct + private void init() { + logger.info("Configuration initial: {}", this); + } + + public String getConnectUrl() { + return connectUrl; + } + + public void setConnectUrl(String connectUrl) { + this.connectUrl = connectUrl; + } + + @Override + public String toString() { + return "ZookeeperConfiguration{" + + "connectUrl='" + connectUrl + '\'' + + '}'; + } +} diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/controller/SynchronizerController.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/controller/SynchronizerController.java new file mode 100644 index 0000000..26cdb8f --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/controller/SynchronizerController.java @@ -0,0 +1,40 @@ +package com.lanyuanxiaoyao.service.launcher.synchronizer.controller; + +import com.lanyuanxiaoyao.service.launcher.synchronizer.service.SynchronizerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * 同步控制 + * + * @author ZhangJiacheng + * @date 2023-12-07 + */ +@RestController +@RequestMapping("launcher/synchronizer") +public class SynchronizerController { + private static final Logger logger = LoggerFactory.getLogger(SynchronizerController.class); + + private final SynchronizerService synchronizerService; + + public SynchronizerController(@Qualifier("sync-service") SynchronizerService synchronizerService) { + this.synchronizerService = synchronizerService; + } + + @GetMapping("start") + public String start(@RequestParam("flink_job_id") Long flinkJobId) throws Exception { + logger.info("Try to start job: {}", flinkJobId); + return synchronizerService.start(flinkJobId); + } + + @GetMapping("stop") + public void stop(@RequestParam("flink_job_id") Long flinkJobId) throws Exception { + logger.info("Try to start job: {}", flinkJobId); + synchronizerService.stop(flinkJobId); + } +} diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/service/SynchronizerService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/service/SynchronizerService.java new file mode 100644 index 0000000..c81f4b4 --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/synchronizer/service/SynchronizerService.java @@ -0,0 +1,137 @@ +package com.lanyuanxiaoyao.service.launcher.synchronizer.service; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.Constants; +import com.eshore.odcp.hudi.connector.entity.RunMeta; +import com.eshore.odcp.hudi.connector.utils.NameHelper; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.launcher.ExecutorService; +import com.lanyuanxiaoyao.service.launcher.configuration.ClusterConfiguration; +import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration; +import com.lanyuanxiaoyao.service.launcher.configuration.ZookeeperConfiguration; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.locks.InterProcessLock; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; +import org.springframework.stereotype.Service; + +/** + * Sync 服务 + * + * @author ZhangJiacheng + * @date 2023-12-07 + */ +@Service("sync-service") +public class SynchronizerService { + private static final Logger logger = LoggerFactory.getLogger(SynchronizerService.class); + private static final RetryPolicy RETRY_POLICY = RetryPolicy.builder() + .handle(Exception.class) + .withDelay(Duration.ofSeconds(1)) + .withMaxAttempts(3) + .build(); + + private final HadoopConfiguration hadoopConfiguration; + private final ClusterConfiguration clusterConfiguration; + private final CuratorFramework zookeeperClient; + private final ExecutorService executorService; + private final YarnClient yarnClient; + private final ObjectMapper mapper; + + public SynchronizerService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ExecutorService executorService, ZookeeperConfiguration zookeeperConfiguration, Jackson2ObjectMapperBuilder builder) { + this.hadoopConfiguration = hadoopConfiguration; + this.clusterConfiguration = clusterConfiguration; + this.executorService = executorService; + + mapper = builder.build(); + + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration()); + yarnClient.start(); + + zookeeperClient = CuratorFrameworkFactory.builder() + .connectString(zookeeperConfiguration.getConnectUrl()) + .retryPolicy(new ExponentialBackoffRetry((int) Constants.SECOND, 3)) + .sessionTimeoutMs((int) Constants.SECOND) + .connectionTimeoutMs((int) (30 * Constants.SECOND)) + .build(); + zookeeperClient.start(); + } + + @PreDestroy + public void destroy() throws IOException { + if (ObjectUtil.isNotNull(yarnClient)) { + yarnClient.stop(); + yarnClient.close(); + } + + if (ObjectUtil.isNotNull(this.zookeeperClient)) { + this.zookeeperClient.close(); + } + } + + public String start(Long flinkJobId) throws Exception { + String lockPath = NameHelper.syncLauncherLockPath(flinkJobId); + InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath); + try { + if (lock.acquire(2, TimeUnit.SECONDS)) { + Stat stat = zookeeperClient.checkExists().forPath(NameHelper.syncRunningLockPath(flinkJobId, null)); + if (ObjectUtil.isNotNull(stat)) { + String message = StrUtil.format("Job {} is running", flinkJobId); + logger.warn(message); + throw new RuntimeException(message); + } + ApplicationId applicationId = Failsafe + .with(RETRY_POLICY) + .get(() -> executorService.runSync( + flinkJobId, + hadoopConfiguration.getKerberosKeytabPath(), + hadoopConfiguration.getKerberosPrincipal(), + clusterConfiguration.getCluster() + )); + return applicationId.toString(); + } else { + String message = StrUtil.format("Un acquire lock for {}", flinkJobId); + logger.warn(message); + throw new RuntimeException(message); + } + } finally { + if (lock.isAcquiredInThisProcess()) { + lock.release(); + } + } + } + + public void stop(Long flinkJobId) throws Exception { + String syncLockPath = NameHelper.syncRunningLockPath(flinkJobId, null); + Stat stat = zookeeperClient.checkExists().forPath(syncLockPath); + if (ObjectUtil.isNull(stat)) { + return; + } + byte[] bytes = zookeeperClient.getData().forPath(syncLockPath); + RunMeta meta = mapper.readValue(new String(bytes), RunMeta.class); + logger.info("Run meta: {}", meta); + String applicationIdText = meta.getApplicationId(); + if (StrUtil.isBlank(applicationIdText)) { + throw new RuntimeException(StrUtil.format("Cannot parse application id from zookeeper: {}", syncLockPath)); + } + ApplicationId applicationId = ApplicationId.fromString(applicationIdText); + Failsafe.with(RETRY_POLICY) + .run(() -> yarnClient.killApplication(applicationId)); + logger.info("{} have be killed", applicationId); + } +} diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/HadoopUtil.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/HadoopUtil.java new file mode 100644 index 0000000..ca26a73 --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/HadoopUtil.java @@ -0,0 +1,64 @@ +package com.lanyuanxiaoyao.service.launcher.utils; + +import cn.hutool.core.util.ObjectUtil; +import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Hadoop 工具类 + * + * @author ZhangJiacheng + * @date 2022-04-25 + */ +public class HadoopUtil { + private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class); + + public static Configuration createConfiguration() throws IOException { + return createConfiguration(null); + } + + public static Configuration createConfiguration(HadoopConfiguration configuration) throws IOException { + Configuration hadoopConfiguration = new Configuration(); + if (ObjectUtil.isNull(configuration)) { + return hadoopConfiguration; + } + UserGroupInformation.setConfiguration(hadoopConfiguration); + if (UserGroupInformation.isSecurityEnabled()) { + UserGroupInformation.loginUserFromKeytab(configuration.getKerberosPrincipal(), configuration.getKerberosKeytabPath()); + } + return hadoopConfiguration; + } + + public static FileSystem createFileSystem() { + return createFileSystem(null); + } + + public static FileSystem createFileSystem(Configuration configuration) { + try { + if (ObjectUtil.isNull(configuration)) { + return FileSystem.get(new Configuration()); + } + return FileSystem.get(configuration); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static ImmutableList list(FileSystem fileSystem, Path path) { + try { + return Lists.immutable.of(fileSystem.listStatus(path)); + } catch (IOException e) { + logger.error("List error", e); + return Lists.immutable.empty(); + } + } +} diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/JacksonUtil.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/JacksonUtil.java new file mode 100644 index 0000000..913391e --- /dev/null +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/utils/JacksonUtil.java @@ -0,0 +1,30 @@ +package com.lanyuanxiaoyao.service.launcher.utils; + +import cn.hutool.core.util.ObjectUtil; +import java.io.Serializable; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Json 解析相关工具 + * + * @author ZhangJiacheng + * @date 2022-06-12 + */ +public class JacksonUtil implements Serializable { + private static final Logger logger = LoggerFactory.getLogger(JacksonUtil.class); + + private static ObjectMapper INSTANCE = null; + + public static ObjectMapper getMapper() { + if (ObjectUtil.isNull(INSTANCE)) { + INSTANCE = new ObjectMapper(); + INSTANCE.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true); + INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + return INSTANCE; + } +} diff --git a/service-launcher/src/main/resources/application.yml b/service-launcher/src/main/resources/application.yml new file mode 100644 index 0000000..d81e0f4 --- /dev/null +++ b/service-launcher/src/main/resources/application.yml @@ -0,0 +1,7 @@ +spring: + application: + name: service-launcher + profiles: + include: random-port,common,discovery,metrics,forest +forest: + backend: httpclient \ No newline at end of file diff --git a/service-launcher/src/main/resources/logback-spring.xml b/service-launcher/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..53f79f0 --- /dev/null +++ b/service-launcher/src/main/resources/logback-spring.xml @@ -0,0 +1,53 @@ + + + + + + + + + + + + true + + ${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push} + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx + + true + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %clr(%5p) %clr([${HOSTNAME}]){yellow} %clr([%t]){magenta} %clr(%logger{40}){cyan} #@# %m%n%wEx + + + + + ${LOGGING_PARENT:-.}/${APP_NAME:-run}.log + + ${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz + 7 + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx + + + + + + + + + + + + \ No newline at end of file diff --git a/service-launcher/src/main/resources/services/org.apache.flink.client.deployment.ClusterClientFactory b/service-launcher/src/main/resources/services/org.apache.flink.client.deployment.ClusterClientFactory new file mode 100644 index 0000000..ea1c4de --- /dev/null +++ b/service-launcher/src/main/resources/services/org.apache.flink.client.deployment.ClusterClientFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.yarn.YarnClusterClientFactory \ No newline at end of file diff --git a/service-launcher/src/main/resources/services/org.apache.flink.core.execution.PipelineExecutorFactory b/service-launcher/src/main/resources/services/org.apache.flink.core.execution.PipelineExecutorFactory new file mode 100644 index 0000000..d56f8c5 --- /dev/null +++ b/service-launcher/src/main/resources/services/org.apache.flink.core.execution.PipelineExecutorFactory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory +org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory \ No newline at end of file diff --git a/service-launcher/src/main/resources/services/org.apache.flink.metrics.reporter.MetricReporterFactory b/service-launcher/src/main/resources/services/org.apache.flink.metrics.reporter.MetricReporterFactory new file mode 100644 index 0000000..e16bdb6 --- /dev/null +++ b/service-launcher/src/main/resources/services/org.apache.flink.metrics.reporter.MetricReporterFactory @@ -0,0 +1 @@ +com.eshore.odcp.hudi.connector.utils.executor.metrics.VictoriaMetricsReporterFactory \ No newline at end of file diff --git a/service-launcher/src/main/resources/services/org.apache.flink.table.factories.Factory b/service-launcher/src/main/resources/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000..ac5f9b6 --- /dev/null +++ b/service-launcher/src/main/resources/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.planner.delegation.DefaultExecutorFactory +org.apache.flink.table.planner.delegation.DefaultParserFactory +org.apache.flink.table.planner.delegation.DefaultPlannerFactory