diff --git a/database/table_tb_app_flink_job_config.sql b/database/table_tb_app_flink_job_config.sql index 65d4fb0..2bf03ef 100644 --- a/database/table_tb_app_flink_job_config.sql +++ b/database/table_tb_app_flink_job_config.sql @@ -7,5 +7,6 @@ CREATE TABLE `tb_app_flink_job_config` `application_id` varchar(50) DEFAULT NULL COMMENT '同步 yarn id', `run_mode` varchar(20) NOT NULL DEFAULT 'ALL_IN_ONE' COMMENT '运行模式', `one_in_one_yarn_job_id` bigint(20) DEFAULT NULL COMMENT 'ONE_IN_ONE yarn 配置', + `tags` varchar(500) NOT NULL DEFAULT '' COMMENT '标签', PRIMARY KEY (`id`) ) DEFAULT CHARSET = utf8mb4 COMMENT ='Flink 任务信息表'; diff --git a/service-cli/service-cli-core/src/main/java/com/lanyuanxiaoyao/service/cli/core/RuntimeInfo.java b/service-cli/service-cli-core/src/main/java/com/lanyuanxiaoyao/service/cli/core/RuntimeInfo.java index 00bff1b..28e1c58 100644 --- a/service-cli/service-cli-core/src/main/java/com/lanyuanxiaoyao/service/cli/core/RuntimeInfo.java +++ b/service-cli/service-cli-core/src/main/java/com/lanyuanxiaoyao/service/cli/core/RuntimeInfo.java @@ -264,6 +264,7 @@ public class RuntimeInfo { public static final class HudiInfo { private String appHdfsPath; + private String appTestHdfsPath; private String archiveHdfsPath; private String victoriaPushUrl; @@ -275,6 +276,14 @@ public class RuntimeInfo { this.appHdfsPath = appHdfsPath; } + public String getAppTestHdfsPath() { + return appTestHdfsPath; + } + + public void setAppTestHdfsPath(String appTestHdfsPath) { + this.appTestHdfsPath = appTestHdfsPath; + } + public String getArchiveHdfsPath() { return archiveHdfsPath; } @@ -295,6 +304,7 @@ public class RuntimeInfo { public String toString() { return "HudiInfo{" + "appHdfsPath='" + appHdfsPath + '\'' + + ", appTestHdfsPath='" + appTestHdfsPath + '\'' + ", archiveHdfsPath='" + archiveHdfsPath + '\'' + ", victoriaPushUrl='" + victoriaPushUrl + '\'' + '}'; diff --git a/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/RunnerApplication.java b/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/RunnerApplication.java index 428d491..6a91104 100644 --- a/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/RunnerApplication.java +++ b/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/RunnerApplication.java @@ -43,7 +43,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RunnerApplication implements ApplicationRunner { private static final Logger logger = LoggerFactory.getLogger(RunnerApplication.class); - + private static final TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig("template", TemplateConfig.ResourceMode.CLASSPATH)); private final DeployInformationProperties deployInformationProperties; private final RuntimeInfo runtimeInfo; private final ImmutableList serviceInfoList; @@ -71,6 +71,13 @@ public class RunnerApplication implements ApplicationRunner { SpringApplication.run(RunnerApplication.class, args); } + private static void generateTemplate(String templatePath, Map data, Path targetScriptPath) throws IOException { + Template template = engine.getTemplate(templatePath); + String script = template.render(data); + Files.deleteIfExists(targetScriptPath); + Files.write(targetScriptPath, script.getBytes()); + } + private List selectHosts(ServiceInfoWrapper serviceInfo) { return serviceInfo.getReplicas() == 0 ? hostInfoList @@ -111,15 +118,14 @@ public class RunnerApplication implements ApplicationRunner { String absolutRootPath = root.toAbsolutePath().toString(); logger.info("Current path: {}", absolutRootPath); - TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig("template", TemplateConfig.ResourceMode.CLASSPATH)); - Template syncTemplate = engine.getTemplate("check/check.ftl"); - String syncScript = syncTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("runtime", runtimeInfo) - .build()); - Path checkScriptFile = Paths.get(root.toString(), "check.sh"); - Files.deleteIfExists(checkScriptFile); - Files.write(checkScriptFile, syncScript.getBytes()); + generateTemplate( + "check/check.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("runtime", runtimeInfo) + .build(), + Paths.get(root.toString(), "check.sh") + ); } private void generateCloud(Path root) throws IOException { @@ -133,8 +139,6 @@ public class RunnerApplication implements ApplicationRunner { deployPlans = mapper.readValue(new String(Files.readAllBytes(planPath)), new TypeReference>>() {}); } - TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig("template", TemplateConfig.ResourceMode.CLASSPATH)); - Template deployTemplate = engine.getTemplate("cloud/deploy.ftl"); for (ServiceInfoWrapper serviceInfo : serviceInfoList) { logger.info("Generate script for {}", serviceInfo.getName()); List selectedHosts; @@ -148,71 +152,70 @@ public class RunnerApplication implements ApplicationRunner { selectedHosts = selectHosts(serviceInfo); deployPlans.put(serviceInfo.getName(), selectedHosts); } - String deployScript = deployTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("hosts", hostInfoList - .collect(HostInfoWrapper::getHostnameIp) - .toSortedList((o1, o2) -> Comparator.naturalOrder().compare(o1.getIp(), o2.getIp()))) - .put("selectedHosts", selectedHosts) - .put("runtime", runtimeInfo) - .put("info", serviceInfo) - .put("classpath", String.join(":", serviceInfo.getClasspath())) - .put("arguments", serviceInfo.getArguments()) - .put("environments", serviceInfo.getEnvironments()) - .build()); - Path deployScriptFile = Paths.get( - root.toString(), - StrUtil.format("deploy-{}.sh", serviceInfo.getName()) + generateTemplate( + "cloud/deploy.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("hosts", hostInfoList + .collect(HostInfoWrapper::getHostnameIp) + .toSortedList((o1, o2) -> Comparator.naturalOrder().compare(o1.getIp(), o2.getIp()))) + .put("selectedHosts", selectedHosts) + .put("runtime", runtimeInfo) + .put("info", serviceInfo) + .put("classpath", String.join(":", serviceInfo.getClasspath())) + .put("arguments", serviceInfo.getArguments()) + .put("environments", serviceInfo.getEnvironments()) + .build(), + Paths.get( + root.toString(), + StrUtil.format("deploy-{}.sh", serviceInfo.getName()) + ) ); - Files.deleteIfExists(deployScriptFile); - Files.write(deployScriptFile, deployScript.getBytes()); - - Template stopTemplate = engine.getTemplate("cloud/stop.ftl"); - String stopScript = stopTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("hosts", hostInfoList - .collect(HostInfoWrapper::getIp) - .toSortedList(Comparator.naturalOrder())) - .put("runtime", runtimeInfo) - .put("info", serviceInfo) - .put("arguments", serviceInfo.getArguments()) - .put("environments", serviceInfo.getEnvironments()) - .build()); - Path stopScriptFile = Paths.get( - root.toString(), - StrUtil.format("stop-{}.sh", serviceInfo.getName()) + generateTemplate( + "cloud/stop.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("hosts", hostInfoList + .collect(HostInfoWrapper::getIp) + .toSortedList(Comparator.naturalOrder())) + .put("runtime", runtimeInfo) + .put("info", serviceInfo) + .put("arguments", serviceInfo.getArguments()) + .put("environments", serviceInfo.getEnvironments()) + .build(), + Paths.get( + root.toString(), + StrUtil.format("stop-{}.sh", serviceInfo.getName()) + ) ); - Files.deleteIfExists(stopScriptFile); - Files.write(stopScriptFile, stopScript.getBytes()); - - Template logTemplate = engine.getTemplate("cloud/log.ftl"); - String logScript = logTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("hosts", hostInfoList - .collect(HostInfoWrapper::getIp) - .toSortedList(Comparator.naturalOrder())) - .put("selectedHosts", selectedHosts) - .put("runtime", runtimeInfo) - .put("info", serviceInfo) - .put("arguments", serviceInfo.getArguments()) - .put("environments", serviceInfo.getEnvironments()) - .build()); - Path logScriptFile = Paths.get( - root.toString(), - StrUtil.format("log-{}.sh", serviceInfo.getName()) + generateTemplate( + "cloud/log.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("hosts", hostInfoList + .collect(HostInfoWrapper::getIp) + .toSortedList(Comparator.naturalOrder())) + .put("selectedHosts", selectedHosts) + .put("runtime", runtimeInfo) + .put("info", serviceInfo) + .put("arguments", serviceInfo.getArguments()) + .put("environments", serviceInfo.getEnvironments()) + .build(), + Paths.get( + root.toString(), + StrUtil.format("log-{}.sh", serviceInfo.getName()) + ) ); - Files.deleteIfExists(logScriptFile); - Files.write(logScriptFile, logScript.getBytes()); } - Template stopTemplate = engine.getTemplate("cloud/stop-script.ftl"); - String stopScript = stopTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("runtime", runtimeInfo) - .build()); - Path stopScriptFile = Paths.get(root.toString(), "stop.sh"); - Files.deleteIfExists(stopScriptFile); - Files.write(stopScriptFile, stopScript.getBytes()); + generateTemplate( + "cloud/stop-script.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("runtime", runtimeInfo) + .build(), + Paths.get(root.toString(), "stop.sh") + ); MutableMap> groups = Maps.mutable.empty(); for (ServiceInfoWrapper service : serviceInfoList) { @@ -229,29 +232,28 @@ public class RunnerApplication implements ApplicationRunner { String group = entry.getKey(); MutableList infos = entry.getValue(); - Template batchDeployTemplate = engine.getTemplate("cloud/batch-deploy.ftl"); - String batchDeployScript = batchDeployTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("services", infos.collect(ServiceInfoWrapper::getName)) - .build()); - Path batchDeployScriptFile = Paths.get( - root.toString(), - StrUtil.format("batch-deploy-{}.sh", group) + generateTemplate( + "cloud/batch-deploy.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("services", infos.collect(ServiceInfoWrapper::getName)) + .build(), + Paths.get( + root.toString(), + StrUtil.format("batch-deploy-{}.sh", group) + ) ); - Files.deleteIfExists(batchDeployScriptFile); - Files.write(batchDeployScriptFile, batchDeployScript.getBytes()); - - Template batchStopTemplate = engine.getTemplate("cloud/batch-stop.ftl"); - String batchStopScript = batchStopTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("services", infos.collect(ServiceInfoWrapper::getName)) - .build()); - Path batchStopScriptFile = Paths.get( - root.toString(), - StrUtil.format("batch-stop-{}.sh", group) + generateTemplate( + "cloud/batch-stop.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("services", infos.collect(ServiceInfoWrapper::getName)) + .build(), + Paths.get( + root.toString(), + StrUtil.format("batch-stop-{}.sh", group) + ) ); - Files.deleteIfExists(batchStopScriptFile); - Files.write(batchStopScriptFile, batchStopScript.getBytes()); } Files.deleteIfExists(planPath); @@ -263,35 +265,32 @@ public class RunnerApplication implements ApplicationRunner { String absolutRootPath = root.toAbsolutePath().toString(); logger.info("Current path: {}", absolutRootPath); - TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig("template", TemplateConfig.ResourceMode.CLASSPATH)); - Template commandTemplate = engine.getTemplate(root.toFile().getName() + "/cli.ftl"); - String commandScript = commandTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("runtime", runtimeInfo) - .put("directly", false) - .build()); - Path commandScriptFile = Paths.get(root.toString(), "cli"); - Files.deleteIfExists(commandScriptFile); - Files.write(commandScriptFile, commandScript.getBytes()); - - Template commandDirectlyTemplate = engine.getTemplate(root.toFile().getName() + "/cli.ftl"); - String commandDirectlyScript = commandDirectlyTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("runtime", runtimeInfo) - .put("directly", true) - .build()); - Path commandDirectlyScriptFile = Paths.get(root.toString(), "cli_d"); - Files.deleteIfExists(commandDirectlyScriptFile); - Files.write(commandDirectlyScriptFile, commandDirectlyScript.getBytes()); - - Template updateTemplate = engine.getTemplate(root.toFile().getName() + "/update.ftl"); - String updateScript = updateTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("runtime", runtimeInfo) - .build()); - Path updateScriptFile = Paths.get(root.toString(), "update.sh"); - Files.deleteIfExists(updateScriptFile); - Files.write(updateScriptFile, updateScript.getBytes()); + generateTemplate( + root.toFile().getName() + "/cli.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("runtime", runtimeInfo) + .put("directly", false) + .build(), + Paths.get(root.toString(), "cli") + ); + generateTemplate( + root.toFile().getName() + "/cli.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("runtime", runtimeInfo) + .put("directly", true) + .build(), + Paths.get(root.toString(), "cli_d") + ); + generateTemplate( + root.toFile().getName() + "/update.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("runtime", runtimeInfo) + .build(), + Paths.get(root.toString(), "update.sh") + ); } private void generateUploader(Path root) throws IOException { @@ -299,24 +298,22 @@ public class RunnerApplication implements ApplicationRunner { String absolutRootPath = root.toAbsolutePath().toString(); logger.info("Current path: {}", absolutRootPath); - TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig("template", TemplateConfig.ResourceMode.CLASSPATH)); - Template startTemplate = engine.getTemplate("uploader/start.ftl"); - String startScript = startTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("runtime", runtimeInfo) - .build()); - Path startScriptFile = Paths.get(root.toString(), "start.sh"); - Files.deleteIfExists(startScriptFile); - Files.write(startScriptFile, startScript.getBytes()); - - Template updateTemplate = engine.getTemplate("uploader/update.ftl"); - String updateScript = updateTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("runtime", runtimeInfo) - .build()); - Path updateScriptFile = Paths.get(root.toString(), "update.sh"); - Files.deleteIfExists(updateScriptFile); - Files.write(updateScriptFile, updateScript.getBytes()); + generateTemplate( + "uploader/start.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("runtime", runtimeInfo) + .build(), + Paths.get(root.toString(), "start.sh") + ); + generateTemplate( + "uploader/update.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("runtime", runtimeInfo) + .build(), + Paths.get(root.toString(), "update.sh") + ); Template stopTemplate = engine.getTemplate("cloud/stop-script.ftl"); String stopScript = stopTemplate.render(MapUtil.builder() @@ -332,29 +329,38 @@ public class RunnerApplication implements ApplicationRunner { String absolutRootPath = root.toAbsolutePath().toString(); logger.info("Current path: {}", absolutRootPath); - TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig("template", TemplateConfig.ResourceMode.CLASSPATH)); - Template syncTemplate = engine.getTemplate("update-jar.ftl"); - String syncScript = syncTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("runtime", runtimeInfo) - .put("jarPrefix", "sync") - .put("jarName", "sync-1.0.0-SNAPSHOT.jar") - .put("uploadPath", runtimeInfo.getHudi().getAppHdfsPath()) - .build()); - Path syncScriptFile = Paths.get(root.toString(), "update-sync.sh"); - Files.deleteIfExists(syncScriptFile); - Files.write(syncScriptFile, syncScript.getBytes()); - - Template taskTemplate = engine.getTemplate("update-jar.ftl"); - String taskScript = taskTemplate.render(MapUtil.builder() - .put("currentPath", absolutRootPath) - .put("runtime", runtimeInfo) - .put("jarPrefix", "task") - .put("jarName", "service-executor-task-1.0.0-SNAPSHOT.jar") - .put("uploadPath", taskJarPath) - .build()); - Path taskScriptFile = Paths.get(root.toString(), "update-task.sh"); - Files.deleteIfExists(taskScriptFile); - Files.write(taskScriptFile, taskScript.getBytes()); + generateTemplate( + "update-jar.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("runtime", runtimeInfo) + .put("jarPrefix", "sync") + .put("jarName", "sync-1.0.0-SNAPSHOT.jar") + .put("uploadPath", runtimeInfo.getHudi().getAppHdfsPath()) + .build(), + Paths.get(root.toString(), "update-sync.sh") + ); + generateTemplate( + "update-jar.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("runtime", runtimeInfo) + .put("jarPrefix", "sync") + .put("jarName", "sync-1.0.0-SNAPSHOT.jar") + .put("uploadPath", runtimeInfo.getHudi().getAppTestHdfsPath()) + .build(), + Paths.get(root.toString(), "update-test.sh") + ); + generateTemplate( + "update-jar.ftl", + MapUtil.builder() + .put("currentPath", absolutRootPath) + .put("runtime", runtimeInfo) + .put("jarPrefix", "task") + .put("jarName", "service-executor-task-1.0.0-SNAPSHOT.jar") + .put("uploadPath", taskJarPath) + .build(), + Paths.get(root.toString(), "update-task.sh") + ); } } diff --git a/service-cli/service-cli-runner/src/main/resources/application-b12.yml b/service-cli/service-cli-runner/src/main/resources/application-b12.yml index 1c304eb..839dbdf 100644 --- a/service-cli/service-cli-runner/src/main/resources/application-b12.yml +++ b/service-cli/service-cli-runner/src/main/resources/application-b12.yml @@ -31,6 +31,7 @@ deploy: hudi: # hudi业务jar包所在目录 app-hdfs-path: hdfs://b2/apps/datalake/jars/app-b12 + app-test-hdfs-path: hdfs://b2/apps/datalake/jars/app-test-b12 # hudi指标推送 victoria-push-url: http://132.126.207.125:35710/api/v1/import/prometheus loki-push-url: ${deploy.runtime.loki.hudi-push-url} diff --git a/service-cli/service-cli-runner/src/main/resources/application.yml b/service-cli/service-cli-runner/src/main/resources/application.yml index 1c8774b..a20db40 100644 --- a/service-cli/service-cli-runner/src/main/resources/application.yml +++ b/service-cli/service-cli-runner/src/main/resources/application.yml @@ -37,6 +37,7 @@ deploy: "[connector.hadoop.kerberos-principal]": ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM "[connector.hadoop.kerberos-keytab-path]": ${deploy.runtime.kerberos-keytab-path} "[connector.hudi.app-hdfs-path]": ${deploy.runtime.hudi.app-hdfs-path} + "[connector.hudi.app-test-hdfs-path]": ${deploy.runtime.hudi.app-test-hdfs-path} "[connector.hudi.victoria-push-url]": ${deploy.runtime.hudi.victoria-push-url} "[connector.hudi.loki-push-url]": ${deploy.runtime.hudi.loki-push-url} arguments: @@ -57,6 +58,7 @@ deploy: "[connector.hadoop.kerberos-principal]": ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM "[connector.hadoop.kerberos-keytab-path]": ${deploy.runtime.kerberos-keytab-path} "[connector.hudi.app-hdfs-path]": ${deploy.runtime.hudi.app-hdfs-path} + "[connector.hudi.app-test-hdfs-path]": ${deploy.runtime.hudi.app-test-hdfs-path} "[connector.hudi.victoria-push-url]": ${deploy.runtime.hudi.victoria-push-url} "[connector.hudi.loki-push-url]": ${deploy.runtime.hudi.loki-push-url} arguments: @@ -77,6 +79,7 @@ deploy: "[connector.hadoop.kerberos-principal]": ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM "[connector.hadoop.kerberos-keytab-path]": ${deploy.runtime.kerberos-keytab-path} "[connector.hudi.app-hdfs-path]": ${deploy.runtime.hudi.app-hdfs-path} + "[connector.hudi.app-test-hdfs-path]": ${deploy.runtime.hudi.app-test-hdfs-path} "[connector.hudi.victoria-push-url]": ${deploy.runtime.hudi.victoria-push-url} "[connector.hudi.loki-push-url]": ${deploy.runtime.hudi.loki-push-url} arguments: @@ -97,6 +100,7 @@ deploy: "[connector.hadoop.kerberos-principal]": ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM "[connector.hadoop.kerberos-keytab-path]": ${deploy.runtime.kerberos-keytab-path} "[connector.hudi.app-hdfs-path]": ${deploy.runtime.hudi.app-hdfs-path} + "[connector.hudi.app-test-hdfs-path]": ${deploy.runtime.hudi.app-test-hdfs-path} "[connector.hudi.victoria-push-url]": ${deploy.runtime.hudi.victoria-push-url} "[connector.hudi.loki-push-url]": ${deploy.runtime.hudi.loki-push-url} arguments: diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java index 37f6f92..0e96ea9 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java @@ -408,6 +408,14 @@ public interface SQLConstants { * 字段 one_in_one_yarn_job_id 别名值 tafjc.one_in_one_yarn_job_id ONE_IN_ONE yarn 配置 */ String ONE_IN_ONE_YARN_JOB_ID_A = _alias_.getAlias() + "." + ONE_IN_ONE_YARN_JOB_ID_O; + /** + * 字段 tags 原始值 tags 标签 + */ + String TAGS_O = "tags"; + /** + * 字段 tags 别名值 tafjc.tags 标签 + */ + String TAGS_A = _alias_.getAlias() + "." + TAGS_O; } /** diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/FlinkJob.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/FlinkJob.java index 38174e4..6b602d7 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/FlinkJob.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/FlinkJob.java @@ -1,6 +1,7 @@ package com.lanyuanxiaoyao.service.common.entity; import java.io.Serializable; +import java.util.List; /** * Flink Job @@ -14,6 +15,7 @@ public class FlinkJob implements Serializable { private String name; private RunMode runMode; private TableMeta.YarnMeta oneInOneSyncYarn; + private List tags; public FlinkJob() { } @@ -23,6 +25,7 @@ public class FlinkJob implements Serializable { this.name = builder.name; this.runMode = builder.runMode; this.oneInOneSyncYarn = builder.oneInOneSyncYarn; + this.tags = builder.tags; } public static Builder builder() { @@ -61,14 +64,23 @@ public class FlinkJob implements Serializable { this.oneInOneSyncYarn = oneInOneSyncYarn; } + public List getTags() { + return tags; + } + + public void setTags(List tags) { + this.tags = tags; + } + @Override public String toString() { return "FlinkJob{" + - "id=" + id + - ", name='" + name + '\'' + - ", runMode=" + runMode + - ", oneInOneSyncYarn=" + oneInOneSyncYarn + - '}'; + "id=" + id + + ", name='" + name + '\'' + + ", runMode=" + runMode + + ", oneInOneSyncYarn=" + oneInOneSyncYarn + + ", tags='" + tags + '\'' + + '}'; } public enum RunMode { @@ -92,6 +104,7 @@ public class FlinkJob implements Serializable { private String name; private RunMode runMode; private TableMeta.YarnMeta oneInOneSyncYarn; + private List tags; private Builder() {} @@ -115,6 +128,11 @@ public class FlinkJob implements Serializable { return this; } + public Builder tags(List tags) { + this.tags = tags; + return this; + } + public FlinkJob build() { return new FlinkJob(this); } diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/TableMetaHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/TableMetaHelper.java index dd0fc42..8214000 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/TableMetaHelper.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/TableMetaHelper.java @@ -575,16 +575,4 @@ public class TableMetaHelper { .findFirst(); } } - - public static boolean existsTag(TableMeta meta, String tag) { - return existsTag(meta.getTags(), tag); - } - - public static boolean existsTag(String sourceTags, String tag) { - return existsTag(Arrays.asList(sourceTags.split(",")), tag); - } - - public static boolean existsTag(List sourceTags, String tag) { - return sourceTags != null && sourceTags.contains(tag); - } } diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/TagsHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/TagsHelper.java new file mode 100644 index 0000000..f5b7713 --- /dev/null +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/TagsHelper.java @@ -0,0 +1,30 @@ +package com.lanyuanxiaoyao.service.common.utils; + +import com.lanyuanxiaoyao.service.common.entity.FlinkJob; +import com.lanyuanxiaoyao.service.common.entity.TableMeta; +import java.util.Arrays; +import java.util.List; + +/** + * 标签比对 + * + * @author lanyuanxiaoyao + * @date 2024-07-29 + */ +public class TagsHelper { + public static boolean existsTag(FlinkJob job, String tag) { + return existsTag(job.getTags(), tag); + } + + public static boolean existsTag(TableMeta meta, String tag) { + return existsTag(meta.getTags(), tag); + } + + public static boolean existsTag(String sourceTags, String tag) { + return existsTag(Arrays.asList(sourceTags.split(",")), tag); + } + + public static boolean existsTag(List sourceTags, String tag) { + return sourceTags != null && sourceTags.contains(tag); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/FlinkJobService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/FlinkJobService.java index 7644db4..079fda4 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/FlinkJobService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/FlinkJobService.java @@ -6,6 +6,7 @@ import cn.hutool.core.util.ObjectUtil; import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.exception.FlinkJobNotFoundException; +import java.util.List; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; @@ -43,7 +44,8 @@ public class FlinkJobService extends BaseService { TbAppFlinkJobConfig.NAME_A, TbAppFlinkJobConfig.RUN_MODE_A, TbAppYarnJobConfig.JOB_MANAGER_MEMORY_A, - TbAppYarnJobConfig.TASK_MANAGER_MEMORY_A + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_A, + TbAppFlinkJobConfig.TAGS_A ) .from(TbAppFlinkJobConfig._alias_) .leftJoin(TbAppYarnJobConfig._alias_) @@ -63,11 +65,14 @@ public class FlinkJobService extends BaseService { .jobManagerMemory(rs.getInt(4)) .taskManagerMemory(rs.getInt(5)) .build(); + String tagText = rs.getString(6); + List tags = Lists.mutable.of(tagText.split(",")); return FlinkJob.builder() .id(rs.getLong(1)) .name(rs.getString(2)) .runMode(mode) .oneInOneSyncYarn(yarnMeta) + .tags(tags) .build(); }) ); diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java index ce3dfad..e66bc58 100644 --- a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java @@ -9,6 +9,7 @@ import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.exception.CheckpointRootPathNotFoundException; import com.lanyuanxiaoyao.service.common.utils.NameHelper; +import com.lanyuanxiaoyao.service.common.utils.TagsHelper; import com.lanyuanxiaoyao.service.configuration.HudiServiceProperties; import com.lanyuanxiaoyao.service.executor.Runner; import com.lanyuanxiaoyao.service.forest.service.InfoService; @@ -23,7 +24,18 @@ import java.util.List; import java.util.Optional; import java.util.regex.Pattern; import org.apache.flink.client.cli.ClientOptions; -import org.apache.flink.configuration.*; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.yarn.configuration.YarnConfigOptions; @@ -137,9 +149,13 @@ public class ExecutorService { ); } - private String getLatestExecutorJarPath() throws IOException { + private String getLatestExecutorJarPath(FlinkJob flinkJob) throws IOException { try (FileSystem fileSystem = HadoopUtil.createFileSystem(HadoopUtil.createConfiguration(hadoopConfiguration))) { Path root = new Path(hudiConfiguration.getAppHdfsPath()); + if (TagsHelper.existsTag(flinkJob, Constants.TAGS_USE_TEST_JAR)) { + logger.warn("Use test jar for {}", flinkJob.getId()); + root = new Path(hudiConfiguration.getAppTestHdfsPath()); + } return Lists.immutable.of(fileSystem.listStatus(root)) .select(FileStatus::isFile) .collect(FileStatus::getPath) @@ -263,7 +279,7 @@ public class ExecutorService { // configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000); // 业务jar包 - String executorJarPath = getLatestExecutorJarPath(); + String executorJarPath = getLatestExecutorJarPath(flinkJob); logger.info("Executor jar path: {}", executorJarPath); Long executorJarVersion = getLatestExecutorJarVersion(executorJarPath); configuration.set(PipelineOptions.JARS, new ArrayList() {{ @@ -314,7 +330,7 @@ public class ExecutorService { configuration.setString(YarnConfigOptions.APPLICATION_NAME, NameHelper.compactionJobName(flinkJob.getId(), tableMeta.getAlias())); - String executorJarPath = getLatestExecutorJarPath(); + String executorJarPath = getLatestExecutorJarPath(flinkJob); logger.info("Executor jar path: {}", executorJarPath); Long executorJarVersion = getLatestExecutorJarVersion(executorJarPath); configuration.set(PipelineOptions.JARS, new ArrayList() {{ diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java index a990090..0a637d2 100644 --- a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java @@ -10,7 +10,7 @@ import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.entity.compaction.ScheduleJob; import com.lanyuanxiaoyao.service.common.utils.LogHelper; import com.lanyuanxiaoyao.service.common.utils.NameHelper; -import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; +import com.lanyuanxiaoyao.service.common.utils.TagsHelper; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; @@ -30,7 +30,6 @@ import java.time.Duration; import java.time.Instant; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import javax.annotation.PreDestroy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -218,7 +217,7 @@ public class CompactionService { TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); compactionJobGetTableInfoCost.record(Duration.between(getTableInfoStartTime, Instant.now())); - if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) { + if (TagsHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) { logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias()); return; } diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HudiConfiguration.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HudiConfiguration.java index c76495a..305037c 100644 --- a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HudiConfiguration.java +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/configuration/HudiConfiguration.java @@ -18,6 +18,7 @@ public class HudiConfiguration { private static final Logger logger = LoggerFactory.getLogger(HudiConfiguration.class); private String appHdfsPath; + private String appTestHdfsPath; private String victoriaPushUrl; private String lokiPushUrl; @@ -34,6 +35,14 @@ public class HudiConfiguration { this.appHdfsPath = appHdfsPath; } + public String getAppTestHdfsPath() { + return appTestHdfsPath; + } + + public void setAppTestHdfsPath(String appTestHdfsPath) { + this.appTestHdfsPath = appTestHdfsPath; + } + public String getVictoriaPushUrl() { return victoriaPushUrl; } @@ -54,6 +63,7 @@ public class HudiConfiguration { public String toString() { return "HudiConfiguration{" + "appHdfsPath='" + appHdfsPath + '\'' + + ", appTestHdfsPath='" + appTestHdfsPath + '\'' + ", victoriaPushUrl='" + victoriaPushUrl + '\'' + ", lokiPushUrl='" + lokiPushUrl + '\'' + '}'; diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java index 5cb89c1..6058f58 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/CrmFocusScheduleJob.java @@ -1,16 +1,13 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; -import cn.hutool.core.util.StrUtil; import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.common.Constants; -import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; +import com.lanyuanxiaoyao.service.common.utils.TagsHelper; import com.lanyuanxiaoyao.service.forest.service.HudiService; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import org.eclipse.collections.api.factory.Maps; -import org.eclipse.collections.api.list.ImmutableList; import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; import org.slf4j.Logger; @@ -48,7 +45,7 @@ public class CrmFocusScheduleJob extends BaseScheduleJob { infoService, hudiService, mapper, - meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS), + meta -> TagsHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS), comment ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java index b775e05..9f33afe 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusScheduleJob.java @@ -2,13 +2,12 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.common.Constants; -import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; +import com.lanyuanxiaoyao.service.common.utils.TagsHelper; import com.lanyuanxiaoyao.service.forest.service.HudiService; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import org.eclipse.collections.api.factory.Maps; import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; import org.slf4j.Logger; @@ -46,7 +45,7 @@ public class FocusScheduleJob extends BaseScheduleJob { infoService, hudiService, mapper, - meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_FOCUS), + meta -> TagsHelper.existsTag(meta.getTags(), Constants.TAGS_FOCUS), comment ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java index 425b34b..c7393e1 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java @@ -3,13 +3,12 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; import cn.hutool.core.util.StrUtil; import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.common.Constants; -import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; +import com.lanyuanxiaoyao.service.common.utils.TagsHelper; import com.lanyuanxiaoyao.service.forest.service.HudiService; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import org.eclipse.collections.api.factory.Maps; import org.eclipse.collections.api.list.ImmutableList; import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; @@ -49,7 +48,7 @@ public class FocusUnVersionUpdateScheduleJob extends BaseScheduleJob { infoService, hudiService, mapper, - meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_FOCUS) + meta -> TagsHelper.existsTag(meta.getTags(), Constants.TAGS_FOCUS) && unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias())), comment ); diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java index c9d62d3..5d8ae23 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java @@ -2,13 +2,12 @@ package com.lanyuanxiaoyao.service.scheduler.quartz.compaction; import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.common.Constants; -import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; +import com.lanyuanxiaoyao.service.common.utils.TagsHelper; import com.lanyuanxiaoyao.service.forest.service.HudiService; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.scheduler.utils.ScheduleHelper; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import org.eclipse.collections.api.factory.Maps; import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; import org.slf4j.Logger; @@ -46,7 +45,7 @@ public class OdsFocusScheduleJob extends BaseScheduleJob { infoService, hudiService, mapper, - meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_ODS_FOCUS), + meta -> TagsHelper.existsTag(meta.getTags(), Constants.TAGS_ODS_FOCUS), comment ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java index 29355aa..5c1a1f1 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java @@ -8,7 +8,7 @@ import com.github.loki4j.slf4j.marker.LabelMarker; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.SyncState; import com.lanyuanxiaoyao.service.common.entity.compaction.ScheduleJob; -import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; +import com.lanyuanxiaoyao.service.common.utils.TagsHelper; import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.configuration.entity.info.SimpleTableMeta; import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; @@ -83,9 +83,9 @@ public class ScheduleHelper { return false; }) // 拒绝不压缩标志的任务 - .reject(meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_NO_COMPACT)) + .reject(meta -> TagsHelper.existsTag(meta.getTags(), Constants.TAGS_NO_COMPACT)) // 拒绝不调度压缩标志的任务 - .reject(meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_NO_SCHEDULE_COMPACT)) + .reject(meta -> TagsHelper.existsTag(meta.getTags(), Constants.TAGS_NO_SCHEDULE_COMPACT)) .collect(meta -> { long compactionDuration = 0L; try { @@ -116,7 +116,7 @@ public class ScheduleHelper { // 统一在这里覆盖特定请求 // CRM重点表独占A4集群 - if (TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS)) { + if (TagsHelper.existsTag(meta.getTags(), Constants.TAGS_CRM_FOCUS)) { finalMetadata.put(Constants.SCHEDULE_FORCE, Constants.CLUSTER_A4); } else { finalMetadata.put(Constants.SCHEDULE_ESCAPE, Constants.CLUSTER_A4); diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkJobVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkJobVO.java index 3a86cf9..a4e75ef 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkJobVO.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkJobVO.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.web.entity; import com.fasterxml.jackson.annotation.JsonIgnore; import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.TableMeta; +import java.util.List; /** * 避免前段不支持java long类型的精度 @@ -34,6 +35,10 @@ public class FlinkJobVO { return flinkJob.getOneInOneSyncYarn(); } + public List getTags() { + return flinkJob.getTags(); + } + @Override public String toString() { return "FlinkJobVO{" + diff --git a/service-web/src/main/resources/static/components/common.js b/service-web/src/main/resources/static/components/common.js index 1250971..48d3c48 100644 --- a/service-web/src/main/resources/static/components/common.js +++ b/service-web/src/main/resources/static/components/common.js @@ -816,7 +816,7 @@ function copyField(field, tips = '复制', ignoreLength = 0) { } } -function flinkJobProperty(id, name, runMode) { +function flinkJobProperty(id, name, runMode, tags) { return { type: 'property', title: 'Flink Job 配置', @@ -829,6 +829,15 @@ function flinkJobProperty(id, name, runMode) { ...mappingField(`${runMode}`, runModeMapping), } }, + { + label: '标签', + content: { + type: 'each', + source: `\${SPLIT(${tags}, ",")}`, + items: mappingField('item', tagsMapping), + }, + span: 3, + }, ], } } @@ -875,7 +884,7 @@ function flinkJobDialog() { showCloseButton: false, size: 'md', body: [ - flinkJobProperty('flinkJobId', 'flinkJob.name', 'flinkJob.runMode'), + flinkJobProperty('flinkJobId', 'flinkJob.name', 'flinkJob.runMode', 'flinkJob.tags'), {type: 'divider'}, { type: 'action', @@ -1797,7 +1806,7 @@ function tableMetaDialog() { ...runMetaProperty('compaction'), }, {type: 'divider'}, - flinkJobProperty('flinkJobId', 'flinkJob.name', 'flinkJob.runMode'), + flinkJobProperty('flinkJobId', 'flinkJob.name', 'flinkJob.runMode', 'flinkJob.tags'), {type: 'divider'}, { type: 'property', @@ -2038,6 +2047,7 @@ let tagsMapping = [ mappingItem('取消算子合并', 'DISABLE_CHAINING'), mappingItem('跟踪压缩op_ts', 'TRACE_LATEST_OP_TS'), mappingItem('不使用HSync', 'DISABLE_HSYNC'), + mappingItem('测试包', 'USE_TEST_JAR'), ] let hudiTableTypeMapping = [ diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java index a661980..ba305a5 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java @@ -9,12 +9,16 @@ import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.exception.CheckpointRootPathNotFoundException; import com.lanyuanxiaoyao.service.common.exception.ZookeeperUrlNotFoundException; import com.lanyuanxiaoyao.service.common.utils.NameHelper; -import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; +import com.lanyuanxiaoyao.service.common.utils.TagsHelper; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; import com.lanyuanxiaoyao.service.sync.functions.PulsarMessage2RecordFunction; import com.lanyuanxiaoyao.service.sync.functions.PulsarMessageSourceReader; import com.lanyuanxiaoyao.service.sync.functions.ValidateRecordFilter; -import com.lanyuanxiaoyao.service.sync.utils.*; +import com.lanyuanxiaoyao.service.sync.utils.ArgumentsUtils; +import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils; +import com.lanyuanxiaoyao.service.sync.utils.StatusUtils; +import com.lanyuanxiaoyao.service.sync.utils.SyncUtils; +import com.lanyuanxiaoyao.service.sync.utils.ZkUtils; import java.io.IOException; import java.util.List; import java.util.Map; @@ -34,7 +38,10 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies. import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.lanyuanxiaoyao.service.common.Constants.*; +import static com.lanyuanxiaoyao.service.common.Constants.GB; +import static com.lanyuanxiaoyao.service.common.Constants.HOUR; +import static com.lanyuanxiaoyao.service.common.Constants.MINUTE; +import static com.lanyuanxiaoyao.service.common.Constants.TAGS_DISABLE_CHAINING; /** * 同步应用 @@ -92,7 +99,7 @@ public class Synchronizer { environment.getCheckpointConfig().enableUnalignedCheckpoints(); environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(5); - if (tableMetaList.stream().anyMatch(meta -> TableMetaHelper.existsTag(meta, TAGS_DISABLE_CHAINING))) { + if (tableMetaList.stream().anyMatch(meta -> TagsHelper.existsTag(meta, TAGS_DISABLE_CHAINING))) { logger.warn("Disable operator chaining"); environment.disableOperatorChaining(); } @@ -148,7 +155,7 @@ public class Synchronizer { SingleOutputStreamOperator source = environment .addSource(new PulsarMessageSourceReader(configuration, flinkJob, tableMeta)) .setParallelism(tableMeta.getHudi().getSourceTasks()); - if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_PULSAR_BACKUP)) { + if (TagsHelper.existsTag(tableMeta, Constants.TAGS_PULSAR_BACKUP)) { Path path = new Path(StrUtil.format("hdfs://b2/apps/datalake/hive_test/source/{}/{}", String.join("_", flinkJob.getName().split("\\s")), tableMeta.getAlias())); StreamingFileSink fileSink = StreamingFileSink.forRowFormat(path, new SimpleStringEncoder<>("UTF-8")) .withRollingPolicy(DefaultRollingPolicy.builder() diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ArgumentsUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ArgumentsUtils.java index 69af93d..f1051e7 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ArgumentsUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ArgumentsUtils.java @@ -72,7 +72,7 @@ public class ArgumentsUtils { return JacksonUtils.getMapper().readValue(argsTool.get(Constants.FLINK_JOB), FlinkJob.class); } - public static String getInstants(String[] args) throws JsonProcessingException { + public static String getInstants(String[] args) { ParameterTool argsTool = ParameterTool.fromArgs(args); if (!argsTool.has(Constants.INSTANTS)) { return ""; diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java index 576cd38..3be2367 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java @@ -7,6 +7,7 @@ import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; +import com.lanyuanxiaoyao.service.common.utils.TagsHelper; import com.lanyuanxiaoyao.service.sync.configuration.DefaultPartitionNameKeyGenerator; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; import java.util.Optional; @@ -104,17 +105,17 @@ public class ConfigurationUtils { configuration.setString(FlinkOptions.PATH, tableMeta.getHudi().getTargetHdfsPath()); configuration.setString(FlinkOptions.RECORD_KEY_FIELD, Constants.UNION_KEY_NAME); configuration.setBoolean(FlinkOptions.PRE_COMBINE, false); - if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_PRE_COMBINE)) { + if (TagsHelper.existsTag(tableMeta, Constants.TAGS_PRE_COMBINE)) { configuration.setBoolean(FlinkOptions.PRE_COMBINE, true); } configuration.setString(FlinkOptions.PRECOMBINE_FIELD, Constants.UPDATE_TIMESTAMP_KEY_NAME); configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, schema.toString()); - if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_NO_IGNORE_FAILED)) { + if (TagsHelper.existsTag(tableMeta, Constants.TAGS_NO_IGNORE_FAILED)) { configuration.setBoolean(FlinkOptions.IGNORE_FAILED, false); } - if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_DISABLE_HSYNC)) { + if (TagsHelper.existsTag(tableMeta, Constants.TAGS_DISABLE_HSYNC)) { logger.info("Disable hsync"); configuration.setBoolean(HoodieWriteConfig.USE_HSYNC.key(), false); }