diff --git a/bin/build-all.sh b/bin/build-all.sh index 8235915..89704b5 100755 --- a/bin/build-all.sh +++ b/bin/build-all.sh @@ -3,7 +3,7 @@ root_path=$(dirname $(cd $(dirname $0);pwd)) source $root_path/bin/library.sh mvn deploy -N -D skipTests -P local -s ~/.m2/settings-development.xml deploy service-common service-dependencies service-configuration service-forest service-cli service-cli/service-cli-core service-executor service-executor/service-executor-core utils/executor -package service-api service-check service-cli/service-cli-runner service-cloud-query service-executor/service-executor-manager service-executor/service-executor-task service-command service-exporter service-flink-query service-gateway service-hudi-query service-info-query service-monitor service-loki-query service-pulsar-query service-queue service-scheduler service-uploader service-web service-yarn-query service-zookeeper-query utils/sync +package service-api service-check service-cli/service-cli-runner service-cloud-query service-executor/service-executor-manager service-executor/service-executor-task service-command service-command-pro service-exporter service-flink-query service-gateway service-hudi-query service-info-query service-monitor service-loki-query service-pulsar-query service-queue service-scheduler service-uploader service-web service-yarn-query service-zookeeper-query utils/sync configs=(b2a4 b2b1 b2b5 b2b12) for config in ${configs[*]}; @@ -16,6 +16,7 @@ upload $root_path/service-api/target/service-api-1.0.0-SNAPSHOT.jar upload $root_path/service-check/target/service-check-1.0.0-SNAPSHOT.jar upload $root_path/service-cloud-query/target/service-cloud-query-1.0.0-SNAPSHOT.jar upload $root_path/service-command/target/service-command-1.0.0-SNAPSHOT.jar +upload $root_path/service-command-pro/target/service-command-pro-1.0.0-SNAPSHOT.jar upload $root_path/service-executor/service-executor-manager/target/service-executor-manager-1.0.0-SNAPSHOT.jar upload $root_path/service-executor/service-executor-task/target/service-executor-task-1.0.0-SNAPSHOT.jar upload $root_path/service-exporter/target/service-exporter-1.0.0-SNAPSHOT.jar diff --git a/bin/build-command-pro.sh b/bin/build-command-pro.sh new file mode 100755 index 0000000..0b5b1bb --- /dev/null +++ b/bin/build-command-pro.sh @@ -0,0 +1,6 @@ +#!/bin/bash +root_path=$(dirname $(cd $(dirname $0);pwd)) +source $root_path/bin/library.sh +deploy service-common service-dependencies service-configuration service-forest +package service-command-pro +upload $root_path/service-command-pro/target/service-command-pro-1.0.0-SNAPSHOT.jar \ No newline at end of file diff --git a/pom.xml b/pom.xml index 2df2773..07e11e8 100644 --- a/pom.xml +++ b/pom.xml @@ -19,6 +19,7 @@ service-cli service-cloud-query service-command + service-command-pro service-executor service-exporter service-flink-query @@ -123,6 +124,11 @@ executor ${project.version} + + com.lanyuanxiaoyao + sync + ${project.version} + diff --git a/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/DeployInformationProperties.java b/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/DeployInformationProperties.java index 95640a6..08a1bc8 100644 --- a/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/DeployInformationProperties.java +++ b/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/DeployInformationProperties.java @@ -77,6 +77,7 @@ public class DeployInformationProperties { public static final class Generate { private Boolean cloud = true; private Boolean command = true; + private Boolean commandPro = true; private Boolean uploader = true; private Boolean updateJar = true; @@ -96,6 +97,14 @@ public class DeployInformationProperties { this.command = command; } + public Boolean getCommandPro() { + return commandPro; + } + + public void setCommandPro(Boolean commandPro) { + this.commandPro = commandPro; + } + public Boolean getUploader() { return uploader; } @@ -117,6 +126,7 @@ public class DeployInformationProperties { return "Generate{" + "cloud=" + cloud + ", command=" + command + + ", commandPro=" + commandPro + ", uploader=" + uploader + ", updateJar=" + updateJar + '}'; diff --git a/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/RunnerApplication.java b/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/RunnerApplication.java index a80d553..9133888 100644 --- a/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/RunnerApplication.java +++ b/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/RunnerApplication.java @@ -1,7 +1,6 @@ package com.lanyuanxiaoyao.service.cli.runner; import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.io.IoUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.RandomUtil; @@ -34,7 +33,6 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.core.io.ClassPathResource; /** * 启动类 @@ -98,6 +96,8 @@ public class RunnerApplication implements ApplicationRunner { generateCloud(Paths.get("cloud")); if (deployInformationProperties.getGenerate().getCommand()) generateCommand(Paths.get("command")); + if (deployInformationProperties.getGenerate().getCommandPro()) + generateCommand(Paths.get("command-pro")); if (deployInformationProperties.getGenerate().getUploader()) generateUploader(Paths.get("uploader")); if (deployInformationProperties.getGenerate().getUpdateJar()) @@ -252,7 +252,7 @@ public class RunnerApplication implements ApplicationRunner { logger.info("Current path: {}", absolutRootPath); TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig("template", TemplateConfig.ResourceMode.CLASSPATH)); - Template commandTemplate = engine.getTemplate("command/cli.ftl"); + Template commandTemplate = engine.getTemplate(root.toFile().getName() + "/cli.ftl"); String commandScript = commandTemplate.render(MapUtil.builder() .put("currentPath", absolutRootPath) .put("runtime", runtimeInfo) @@ -262,7 +262,7 @@ public class RunnerApplication implements ApplicationRunner { Files.deleteIfExists(commandScriptFile); Files.write(commandScriptFile, commandScript.getBytes()); - Template commandDirectlyTemplate = engine.getTemplate("command/cli.ftl"); + Template commandDirectlyTemplate = engine.getTemplate(root.toFile().getName() + "/cli.ftl"); String commandDirectlyScript = commandDirectlyTemplate.render(MapUtil.builder() .put("currentPath", absolutRootPath) .put("runtime", runtimeInfo) @@ -272,7 +272,7 @@ public class RunnerApplication implements ApplicationRunner { Files.deleteIfExists(commandDirectlyScriptFile); Files.write(commandDirectlyScriptFile, commandDirectlyScript.getBytes()); - Template updateTemplate = engine.getTemplate("command/update.ftl"); + Template updateTemplate = engine.getTemplate(root.toFile().getName() + "/update.ftl"); String updateScript = updateTemplate.render(MapUtil.builder() .put("currentPath", absolutRootPath) .put("runtime", runtimeInfo) diff --git a/service-cli/service-cli-runner/src/main/resources/template/command-pro/cli.ftl b/service-cli/service-cli-runner/src/main/resources/template/command-pro/cli.ftl new file mode 100644 index 0000000..88febf2 --- /dev/null +++ b/service-cli/service-cli-runner/src/main/resources/template/command-pro/cli.ftl @@ -0,0 +1,4 @@ +#!/bin/bash +mkdir -p ${runtime.jarPath} +export JASYPT_ENCRYPTOR_PASSWORD='r#(R,P\"Dp^A47>WSn:Wn].gs/+\"v:q_Q*An~zF*g-@j@jtSTv5H/,S-3:R?r9R}.' +${runtime.jdkPath}/bin/java <#noparse>-Ddeploy.datetime=$(date +%Y%m%d%H%M%S) -Ddeploy.hostname=$(hostname) -Ddeploy.hostname-full=$(hostname -f) -Ddeploy.start-time=$(date +%Y%m%d%H%M%S) -Dlogging.parent=${runtime.logPath} -Dloki.url=${runtime.loki.servicePushUrl} -Dspring.cloud.zookeeper.connect-string=${runtime.zkUrl} -Dyarn-cluster.sync-clusters=${runtime.yarn.syncClusters} -Dyarn-cluster.compaction-clusters=${runtime.yarn.compactionClusters} -jar ${runtime.jarPath}/service-command-pro.jar<#if directly> $@ diff --git a/service-cli/service-cli-runner/src/main/resources/template/command-pro/update.ftl b/service-cli/service-cli-runner/src/main/resources/template/command-pro/update.ftl new file mode 100644 index 0000000..62d81a5 --- /dev/null +++ b/service-cli/service-cli-runner/src/main/resources/template/command-pro/update.ftl @@ -0,0 +1,3 @@ +#!/bin/bash + +curl ${runtime.downloadUrl}/service-command-pro-1.0.0-SNAPSHOT.jar -o ${runtime.jarPath}/service-command-pro.jar \ No newline at end of file diff --git a/service-command-pro/pom.xml b/service-command-pro/pom.xml new file mode 100644 index 0000000..628f31a --- /dev/null +++ b/service-command-pro/pom.xml @@ -0,0 +1,191 @@ + + + 4.0.0 + + com.lanyuanxiaoyao + hudi-service + 1.0.0-SNAPSHOT + + + service-command-pro + + + + com.lanyuanxiaoyao + service-forest + + + org.springframework.cloud + spring-cloud-starter-loadbalancer + + + org.springframework.boot + spring-boot-starter-web + + + com.google.protobuf + protobuf-java + + + + + org.springframework.shell + spring-shell-starter + + + me.tongfei + progressbar + + + com.sun.jersey + jersey-client + + + com.sun.jersey + jersey-core + + + com.sun.jersey.contribs + jersey-apache-client4 + + + org.apache.hadoop + hadoop-client + + + com.squareup.okio + okio + + + com.google.guava + guava + + + commons-io + commons-io + + + commons-logging + commons-logging + + + org.apache.curator + curator-client + + + + + org.apache.hudi + hudi-flink${flink.major.version}-bundle + + + io.juicefs + juicefs-hadoop + + + org.apache.curator + curator-framework + + + log4j + log4j + + + netty + io.netty + + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + copy-config-file + validate + + copy-resources + + + ${project.build.directory}/classes + + + ${project.parent.basedir}/config/${build-tag} + + *.xml + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + false + true + + + META-INF/spring.handlers + + + META-INF/spring.factories + + + META-INF/spring.schemas + + + com.lanyuanxiaoyao.service.command.pro.CommandProApplication + + + reference.conf + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + core-default.xml + hdfs-default.xml + yarn-default.xml + log4j-surefire*.properties + + + + + + + package + + shade + + + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + + + \ No newline at end of file diff --git a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/CommandProApplication.java b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/CommandProApplication.java new file mode 100644 index 0000000..c8db3c2 --- /dev/null +++ b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/CommandProApplication.java @@ -0,0 +1,43 @@ +package com.lanyuanxiaoyao.service.command.pro; + +import com.lanyuanxiaoyao.service.configuration.SecurityConfig; +import com.lanyuanxiaoyao.service.forest.configuration.SpringCloudDiscoveryInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.FilterType; + +/** + * 命令行工具入口 + * + * @author ZhangJiacheng + * @date 2022-03-24 + */ +@SpringBootApplication +@EnableConfigurationProperties +@ComponentScan( + basePackages = {"com.lanyuanxiaoyao.service"}, + excludeFilters = { + @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = { + SpringCloudDiscoveryInterceptor.class, + SecurityConfig.class + }), + } +) +public class CommandProApplication implements ApplicationRunner { + private static final Logger logger = LoggerFactory.getLogger(CommandProApplication.class); + + public static void main(String[] args) { + SpringApplication.run(CommandProApplication.class, args); + } + + @Override + public void run(ApplicationArguments args) throws Exception { + // Test + } +} diff --git a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java new file mode 100644 index 0000000..93f02e9 --- /dev/null +++ b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java @@ -0,0 +1,21 @@ +package com.lanyuanxiaoyao.service.command.pro.commands; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.shell.standard.ShellComponent; +import org.springframework.shell.standard.ShellMethod; + +/** + * Hudi相关操作 + * + * @author lanyuanxiaoyao + * @date 2024-03-19 + */ +@ShellComponent("Hudi相关操作") +public class HudiCommand { + private static final Logger logger = LoggerFactory.getLogger(HudiCommand.class); + + @ShellMethod("Test") + public void test() { + } +} diff --git a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/configuration/ShellConfiguration.java b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/configuration/ShellConfiguration.java new file mode 100644 index 0000000..3946b9e --- /dev/null +++ b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/configuration/ShellConfiguration.java @@ -0,0 +1,19 @@ +package com.lanyuanxiaoyao.service.command.pro.configuration; + +import org.jline.utils.AttributedString; +import org.springframework.context.annotation.Configuration; +import org.springframework.shell.jline.PromptProvider; + +/** + * Spring Shell 配置 + * + * @author ZhangJiacheng + * @date 2022-04-27 + */ +@Configuration +public class ShellConfiguration implements PromptProvider { + @Override + public AttributedString getPrompt() { + return new AttributedString("hudi-pro:->"); + } +} diff --git a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/configuration/SpringCloudDiscoveryInterceptor.java b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/configuration/SpringCloudDiscoveryInterceptor.java new file mode 100644 index 0000000..263eda4 --- /dev/null +++ b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/configuration/SpringCloudDiscoveryInterceptor.java @@ -0,0 +1,57 @@ +package com.lanyuanxiaoyao.service.command.pro.configuration; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.RandomUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.URLUtil; +import com.dtflys.forest.auth.BasicAuth; +import com.dtflys.forest.http.ForestAddress; +import com.dtflys.forest.http.ForestRequest; +import com.dtflys.forest.interceptor.Interceptor; +import com.lanyuanxiaoyao.service.common.Constants; +import java.net.URI; +import java.net.URL; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.stereotype.Component; + +/** + * @author lanyuanxiaoyao + * @date 2023-04-24 + */ +@Component +public class SpringCloudDiscoveryInterceptor implements Interceptor { + private static final Logger logger = LoggerFactory.getLogger(SpringCloudDiscoveryInterceptor.class); + + private final DiscoveryClient discoveryClient; + + public SpringCloudDiscoveryInterceptor(DiscoveryClient discoveryClient) { + logger.info("Services: {}", discoveryClient.getServices()); + this.discoveryClient = discoveryClient; + } + + @Override + public boolean beforeExecute(ForestRequest request) { + // Load + URL url = URLUtil.url(request.getUrl()); + String host = url.getHost(); + if (StrUtil.isNotBlank(host)) { + ImmutableList urls = Lists.immutable.ofAll(discoveryClient.getInstances(host)) + .collect(instance -> instance.getUri().toString()); + if (ObjectUtil.isNotEmpty(urls)) { + String targetUrl = urls.get(RandomUtil.randomInt(urls.size())); + URI uri = URI.create(targetUrl); + request.setAddress(new ForestAddress(uri.getScheme(), uri.getHost(), uri.getPort())); + } + } + + // Basic auth + BasicAuth basicAuth = new BasicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN); + basicAuth.enhanceAuthorization(request); + + return Interceptor.super.beforeExecute(request); + } +} diff --git a/service-command-pro/src/main/resources/application.yml b/service-command-pro/src/main/resources/application.yml new file mode 100644 index 0000000..de79e10 --- /dev/null +++ b/service-command-pro/src/main/resources/application.yml @@ -0,0 +1,20 @@ +spring: + application: + name: service-command-pro + profiles: + include: common,discovery + main: + web-application-type: none + shell: + interactive: + enabled: true + command: + script: + enabled: false + history: + enabled: false +forest: + backend: httpclient + timeout: 120000 + log-enabled: false + interceptors: com.lanyuanxiaoyao.service.command.pro.configuration.SpringCloudDiscoveryInterceptor \ No newline at end of file diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/ShellConfiguration.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/ShellConfiguration.java index 631a56d..f4232e0 100644 --- a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/ShellConfiguration.java +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/ShellConfiguration.java @@ -14,6 +14,6 @@ import org.springframework.shell.jline.PromptProvider; public class ShellConfiguration implements PromptProvider { @Override public AttributedString getPrompt() { - return new AttributedString("odcp-utils:->"); + return new AttributedString("hudi:->"); } } diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java index 28dc799..63d91e6 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java @@ -96,7 +96,7 @@ public class Compactor { } GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta); - Configuration configuration = SyncUtils.getCompactionFlinkConfiguration( + Configuration configuration = ConfigurationUtils.getCompactionFlinkConfiguration( globalConfiguration, new Configuration(), flinkJob, diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java new file mode 100644 index 0000000..d22629c --- /dev/null +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java @@ -0,0 +1,196 @@ +package com.lanyuanxiaoyao.service.sync.utils; + +import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.util.EnumUtil; +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.common.Constants; +import com.lanyuanxiaoyao.service.common.entity.FlinkJob; +import com.lanyuanxiaoyao.service.common.entity.TableMeta; +import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; +import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.flink.configuration.Configuration; +import org.apache.hudi.client.TraceWriteStatus; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.TraceOverwriteWithLatestAvroPayload; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.config.metrics.HoodieMetricsVictoriaConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.DefaultPartitionNameKeyGenerator; +import org.apache.hudi.metrics.MetricsReporterType; +import org.apache.hudi.org.apache.avro.Schema; +import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.lanyuanxiaoyao.service.common.Constants.HOUR; + +/** + * @author lanyuanxiaoyao + * @date 2024-05-10 + */ +public class ConfigurationUtils { + private static final Logger logger = LoggerFactory.getLogger(ConfigurationUtils.class); + + public static Configuration getSyncFlinkConfiguration(GlobalConfiguration globalConfiguration, Configuration inputConfiguration, FlinkJob flinkJob, TableMeta tableMeta, Schema schema, Integer defaultParallelism) { + Configuration configuration = new Configuration(); + if (inputConfiguration != null) { + configuration = inputConfiguration; + } + + configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true); + configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name()); + configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl()); + configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000); + configuration.setBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE.key(), true); + configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME.key(), Constants.VICTORIA_USERNAME); + configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD.key(), Constants.VICTORIA_PASSWORD); + configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS.key(), ListUtil.toList( + Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC), + Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()), + Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")), + Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()), + Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()), + Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias()) + ).stream().map(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).collect(Collectors.joining(";"))); + + return getFlinkConfiguration(configuration, tableMeta, schema, defaultParallelism); + } + + public static Configuration getCompactionFlinkConfiguration(GlobalConfiguration globalConfiguration, Configuration inputConfiguration, FlinkJob flinkJob, TableMeta tableMeta, Schema schema, Integer defaultParallelism) { + Configuration configuration = new Configuration(); + if (inputConfiguration != null) { + configuration = inputConfiguration; + } + + configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true); + configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name()); + configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl()); + configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000); + configuration.setBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE.key(), true); + configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME.key(), Constants.VICTORIA_USERNAME); + configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD.key(), Constants.VICTORIA_PASSWORD); + configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS.key(), ListUtil.toList( + Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION), + Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()), + Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")), + Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()), + Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()), + Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias()) + ).stream().map(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).collect(Collectors.joining(";"))); + + return getFlinkConfiguration(configuration, tableMeta, schema, defaultParallelism); + } + + public static Configuration getFlinkConfiguration(Configuration inputConfiguration, TableMeta tableMeta, Schema schema, Integer defaultParallelism) { + Configuration configuration = new Configuration(); + if (inputConfiguration != null) { + configuration = inputConfiguration; + } + String tableType = tableMeta.getHudi().getTargetTableType(); + logger.info("Hudi table type: {}", tableMeta.getHudi().getTargetTableType()); + // 基本信息 + configuration.setString(FlinkOptions.TABLE_NAME, tableMeta.getHudi().getTargetTable()); + configuration.setString(FlinkOptions.TABLE_TYPE, tableType); + configuration.setString(FlinkOptions.PATH, tableMeta.getHudi().getTargetHdfsPath()); + configuration.setString(FlinkOptions.RECORD_KEY_FIELD, Constants.UNION_KEY_NAME); + configuration.setBoolean(FlinkOptions.PRE_COMBINE, false); + if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_PRE_COMBINE)) { + configuration.setBoolean(FlinkOptions.PRE_COMBINE, true); + } + configuration.setString(FlinkOptions.PRECOMBINE_FIELD, Constants.UPDATE_TIMESTAMP_KEY_NAME); + configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, schema.toString()); + + if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_NO_IGNORE_FAILED)) { + configuration.setBoolean(FlinkOptions.IGNORE_FAILED, false); + } + + if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_USE_HFLUSH)) { + logger.info("Enable hflush"); + configuration.setBoolean(HoodieWriteConfig.USE_HFLUSH.key(), true); + } + + configuration.setString(FlinkOptions.PARTITION_DEFAULT_NAME, "default"); + configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, DefaultPartitionNameKeyGenerator.class.getName()); + + Optional partitionPath = TableMetaHelper.getPartitionField(tableMeta); + logger.info("Partition field: {}", partitionPath.orElse("")); + if (partitionPath.isPresent()) { + configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPath.get()); + } + + configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceOverwriteWithLatestAvroPayload.class.getName()); + configuration.setString(HoodieWriteConfig.WRITE_STATUS_CLASS_NAME.key(), TraceWriteStatus.class.getName()); + + configuration.setBoolean(FlinkOptions.METADATA_ENABLED, false); + configuration.setInteger(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), Integer.MAX_VALUE); + configuration.setString(FileSystemViewStorageConfig.SECONDARY_VIEW_TYPE.key(), FileSystemViewStorageType.SPILLABLE_DISK.name()); + + // Write + configuration.setInteger(FlinkOptions.WRITE_TASKS, tableMeta.getHudi().getWriteTasks() == 0 ? defaultParallelism : tableMeta.getHudi().getWriteTasks()); + configuration.setInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY, 0); + configuration.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, tableMeta.getHudi().getWriteTaskMaxMemory() == 0 ? FlinkOptions.WRITE_TASK_MAX_SIZE.defaultValue() : tableMeta.getHudi().getWriteTaskMaxMemory()); + configuration.setDouble(FlinkOptions.WRITE_BATCH_SIZE, tableMeta.getHudi().getWriteBatchSize() == 0 ? FlinkOptions.WRITE_BATCH_SIZE.defaultValue() : tableMeta.getHudi().getWriteBatchSize()); + configuration.setLong(FlinkOptions.WRITE_RATE_LIMIT, tableMeta.getHudi().getWriteRateLimit()); + configuration.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, HOUR); + + // 索引 + configuration.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name()); + configuration.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, tableMeta.getHudi().getBucketIndexNumber() == 0 ? 50 : tableMeta.getHudi().getBucketIndexNumber()); + configuration.setString(FlinkOptions.INDEX_KEY_FIELD, Constants.UNION_KEY_NAME); + + configuration.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false); + configuration.setBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED, false); + configuration.setDouble(FlinkOptions.INDEX_STATE_TTL, -1); + + // 增大 就 OOM + // configuration.setDouble(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 64 * M); + // 增大 就 OOM + // configuration.setDouble(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), 128 * M); + + // Compaction + configuration.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + if (EnumUtil.equals(HoodieTableType.COPY_ON_WRITE, tableType)) { + configuration.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); + } + configuration.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true); + configuration.setInteger(FlinkOptions.COMPACTION_TASKS, tableMeta.getHudi().getCompactionTasks()); + configuration.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, StrUtil.isBlank(tableMeta.getHudi().getCompactionStrategy()) ? FlinkOptions.NUM_OR_TIME : tableMeta.getHudi().getCompactionStrategy()); + configuration.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, 1024); + configuration.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, tableMeta.getHudi().getCompactionDeltaSeconds() == 0 ? 15 * 60 : tableMeta.getHudi().getCompactionDeltaSeconds()); + configuration.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, tableMeta.getHudi().getCompactionDeltaCommits() == 0 ? 5 : tableMeta.getHudi().getCompactionDeltaCommits()); + + configuration.setString(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), UnBoundedCompactionStrategy.class.getName()); + // configuration.setString(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), CombineAllCompactionStrategy.class.getName()); + // configuration.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true); + // configuration.setInteger(FlinkOptions.COMPACTION_TASKS, tableMeta.getHudi().getCompactionTasks() == 0 ? defaultParallelism : tableMeta.getHudi().getCompactionTasks()); + // configuration.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, tableMeta.getHudi().getCompactionMaxMemory()); + // configuration.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, tableMeta.getHudi().getCompactionStrategy()); + // configuration.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, tableMeta.getHudi().getCompactionDeltaCommits()); + // configuration.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, tableMeta.getHudi().getCompactionDeltaSeconds()); + + // 时间线保留个数 + configuration.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, tableMeta.getHudi().getKeepCommitVersion()); + // 时间线归档最小保留个数,要比上一个参数大 + configuration.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, tableMeta.getHudi().getKeepCommitVersion() + 50); + // 时间线归档最大保留个数,要比上一个参数大 + configuration.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, tableMeta.getHudi().getKeepCommitVersion() + 100); + // log文件和data文件保留版本数 + configuration.setString(FlinkOptions.CLEAN_POLICY, HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name()); + configuration.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, tableMeta.getHudi().getKeepFileVersion()); + + // 关闭一个内置的 http 服务 + // configuration.setBoolean(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(), false); + + return configuration; + } +} diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java index 3201e50..ddaae00 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java @@ -70,159 +70,6 @@ public class SyncUtils { return TypeConverter.getInstance(meta).convertToSchema(meta); } - public static Configuration getSyncFlinkConfiguration(GlobalConfiguration globalConfiguration, Configuration inputConfiguration, FlinkJob flinkJob, TableMeta tableMeta, Schema schema, Integer defaultParallelism) { - Configuration configuration = new Configuration(); - if (inputConfiguration != null) { - configuration = inputConfiguration; - } - - configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true); - configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name()); - configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl()); - configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000); - configuration.setBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE.key(), true); - configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME.key(), Constants.VICTORIA_USERNAME); - configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD.key(), Constants.VICTORIA_PASSWORD); - configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS.key(), ListUtil.toList( - Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC), - Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()), - Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")), - Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()), - Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()), - Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias()) - ).stream().map(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).collect(Collectors.joining(";"))); - - return getFlinkConfiguration(configuration, tableMeta, schema, defaultParallelism); - } - - public static Configuration getCompactionFlinkConfiguration(GlobalConfiguration globalConfiguration, Configuration inputConfiguration, FlinkJob flinkJob, TableMeta tableMeta, Schema schema, Integer defaultParallelism) { - Configuration configuration = new Configuration(); - if (inputConfiguration != null) { - configuration = inputConfiguration; - } - - configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true); - configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name()); - configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl()); - configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000); - configuration.setBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE.key(), true); - configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME.key(), Constants.VICTORIA_USERNAME); - configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD.key(), Constants.VICTORIA_PASSWORD); - configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS.key(), ListUtil.toList( - Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION), - Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()), - Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")), - Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()), - Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()), - Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias()) - ).stream().map(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).collect(Collectors.joining(";"))); - - return getFlinkConfiguration(configuration, tableMeta, schema, defaultParallelism); - } - - public static Configuration getFlinkConfiguration(Configuration inputConfiguration, TableMeta tableMeta, Schema schema, Integer defaultParallelism) { - Configuration configuration = new Configuration(); - if (inputConfiguration != null) { - configuration = inputConfiguration; - } - String tableType = tableMeta.getHudi().getTargetTableType(); - logger.info("Hudi table type: {}", tableMeta.getHudi().getTargetTableType()); - // 基本信息 - configuration.setString(FlinkOptions.TABLE_NAME, tableMeta.getHudi().getTargetTable()); - configuration.setString(FlinkOptions.TABLE_TYPE, tableType); - configuration.setString(FlinkOptions.PATH, tableMeta.getHudi().getTargetHdfsPath()); - configuration.setString(FlinkOptions.RECORD_KEY_FIELD, Constants.UNION_KEY_NAME); - configuration.setBoolean(FlinkOptions.PRE_COMBINE, false); - if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_PRE_COMBINE)) { - configuration.setBoolean(FlinkOptions.PRE_COMBINE, true); - } - configuration.setString(FlinkOptions.PRECOMBINE_FIELD, Constants.UPDATE_TIMESTAMP_KEY_NAME); - configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, schema.toString()); - - if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_NO_IGNORE_FAILED)) { - configuration.setBoolean(FlinkOptions.IGNORE_FAILED, false); - } - - if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_USE_HFLUSH)) { - logger.info("Enable hflush"); - configuration.setBoolean(HoodieWriteConfig.USE_HFLUSH.key(), true); - } - - configuration.setString(FlinkOptions.PARTITION_DEFAULT_NAME, "default"); - configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, DefaultPartitionNameKeyGenerator.class.getName()); - - Optional partitionPath = TableMetaHelper.getPartitionField(tableMeta); - logger.info("Partition field: {}", partitionPath.orElse("")); - if (partitionPath.isPresent()) { - configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPath.get()); - } - - configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceOverwriteWithLatestAvroPayload.class.getName()); - configuration.setString(HoodieWriteConfig.WRITE_STATUS_CLASS_NAME.key(), TraceWriteStatus.class.getName()); - - configuration.setBoolean(FlinkOptions.METADATA_ENABLED, false); - configuration.setInteger(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), Integer.MAX_VALUE); - configuration.setString(FileSystemViewStorageConfig.SECONDARY_VIEW_TYPE.key(), FileSystemViewStorageType.SPILLABLE_DISK.name()); - - // Write - configuration.setInteger(FlinkOptions.WRITE_TASKS, tableMeta.getHudi().getWriteTasks() == 0 ? defaultParallelism : tableMeta.getHudi().getWriteTasks()); - configuration.setInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY, 0); - configuration.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, tableMeta.getHudi().getWriteTaskMaxMemory() == 0 ? FlinkOptions.WRITE_TASK_MAX_SIZE.defaultValue() : tableMeta.getHudi().getWriteTaskMaxMemory()); - configuration.setDouble(FlinkOptions.WRITE_BATCH_SIZE, tableMeta.getHudi().getWriteBatchSize() == 0 ? FlinkOptions.WRITE_BATCH_SIZE.defaultValue() : tableMeta.getHudi().getWriteBatchSize()); - configuration.setLong(FlinkOptions.WRITE_RATE_LIMIT, tableMeta.getHudi().getWriteRateLimit()); - configuration.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, HOUR); - - // 索引 - configuration.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name()); - configuration.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, tableMeta.getHudi().getBucketIndexNumber() == 0 ? 50 : tableMeta.getHudi().getBucketIndexNumber()); - configuration.setString(FlinkOptions.INDEX_KEY_FIELD, Constants.UNION_KEY_NAME); - - configuration.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false); - configuration.setBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED, false); - configuration.setDouble(FlinkOptions.INDEX_STATE_TTL, -1); - - // 增大 就 OOM - // configuration.setDouble(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 64 * M); - // 增大 就 OOM - // configuration.setDouble(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), 128 * M); - - // Compaction - configuration.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); - if (EnumUtil.equals(HoodieTableType.COPY_ON_WRITE, tableType)) { - configuration.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); - } - configuration.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true); - configuration.setInteger(FlinkOptions.COMPACTION_TASKS, tableMeta.getHudi().getCompactionTasks()); - configuration.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, StrUtil.isBlank(tableMeta.getHudi().getCompactionStrategy()) ? FlinkOptions.NUM_OR_TIME : tableMeta.getHudi().getCompactionStrategy()); - configuration.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, 1024); - configuration.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, tableMeta.getHudi().getCompactionDeltaSeconds() == 0 ? 15 * 60 : tableMeta.getHudi().getCompactionDeltaSeconds()); - configuration.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, tableMeta.getHudi().getCompactionDeltaCommits() == 0 ? 5 : tableMeta.getHudi().getCompactionDeltaCommits()); - - configuration.setString(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), UnBoundedCompactionStrategy.class.getName()); - // configuration.setString(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), CombineAllCompactionStrategy.class.getName()); - // configuration.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true); - // configuration.setInteger(FlinkOptions.COMPACTION_TASKS, tableMeta.getHudi().getCompactionTasks() == 0 ? defaultParallelism : tableMeta.getHudi().getCompactionTasks()); - // configuration.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, tableMeta.getHudi().getCompactionMaxMemory()); - // configuration.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, tableMeta.getHudi().getCompactionStrategy()); - // configuration.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, tableMeta.getHudi().getCompactionDeltaCommits()); - // configuration.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, tableMeta.getHudi().getCompactionDeltaSeconds()); - - // 时间线保留个数 - configuration.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, tableMeta.getHudi().getKeepCommitVersion()); - // 时间线归档最小保留个数,要比上一个参数大 - configuration.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, tableMeta.getHudi().getKeepCommitVersion() + 50); - // 时间线归档最大保留个数,要比上一个参数大 - configuration.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, tableMeta.getHudi().getKeepCommitVersion() + 100); - // log文件和data文件保留版本数 - configuration.setString(FlinkOptions.CLEAN_POLICY, HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name()); - configuration.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, tableMeta.getHudi().getKeepFileVersion()); - - // 关闭一个内置的 http 服务 - // configuration.setBoolean(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(), false); - - return configuration; - } - public static void sinkToHoodieByTable(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta, StreamExecutionEnvironment environment, DataStream inputDataStream) { Schema schema = avroSchemaWithExtraFields(tableMeta); DataStream dataStream = inputDataStream @@ -245,7 +92,7 @@ public class SyncUtils { ); Configuration configuration = tableEnvironment.getConfig().getConfiguration(); int parallelism = configuration.getInteger("parallelism", 1); - configuration = getSyncFlinkConfiguration(globalConfiguration, configuration, flinkJob, tableMeta, schema, parallelism); + configuration = ConfigurationUtils.getSyncFlinkConfiguration(globalConfiguration, configuration, flinkJob, tableMeta, schema, parallelism); DataStream hoodieRecordDataStream = Pipelines.bootstrap(configuration, rowType, parallelism, dataStream); DataStream pipeline = Pipelines.hoodieStreamWrite(configuration, parallelism, hoodieRecordDataStream);