diff --git a/service-cli/service-cli-core/src/main/java/com/lanyuanxiaoyao/service/cli/core/RuntimeInfo.java b/service-cli/service-cli-core/src/main/java/com/lanyuanxiaoyao/service/cli/core/RuntimeInfo.java index ecaf519..00bff1b 100644 --- a/service-cli/service-cli-core/src/main/java/com/lanyuanxiaoyao/service/cli/core/RuntimeInfo.java +++ b/service-cli/service-cli-core/src/main/java/com/lanyuanxiaoyao/service/cli/core/RuntimeInfo.java @@ -7,6 +7,7 @@ package com.lanyuanxiaoyao.service.cli.core; * @date 2023-05-17 */ public class RuntimeInfo { + private String signature; private String user; private String jarPath; private String jdkPath; @@ -22,6 +23,14 @@ public class RuntimeInfo { private SecurityInfo security; private YarnInfo yarn; + public String getSignature() { + return signature; + } + + public void setSignature(String signature) { + this.signature = signature; + } + public String getUser() { return user; } @@ -137,7 +146,8 @@ public class RuntimeInfo { @Override public String toString() { return "RuntimeInfo{" + - "user='" + user + '\'' + + "signature='" + signature + '\'' + + ", user='" + user + '\'' + ", jarPath='" + jarPath + '\'' + ", jdkPath='" + jdkPath + '\'' + ", logPath='" + logPath + '\'' + diff --git a/service-cli/service-cli-runner/src/main/resources/application-b12.yml b/service-cli/service-cli-runner/src/main/resources/application-b12.yml index 735bf68..a57ae88 100644 --- a/service-cli/service-cli-runner/src/main/resources/application-b12.yml +++ b/service-cli/service-cli-runner/src/main/resources/application-b12.yml @@ -1,5 +1,7 @@ deploy: runtime: + # 整套配置唯一标识,通常用于区分生产和测试,避免某些配置混淆,如pulsar订阅名 + signature: b12 # 应用部署主机用户 user: datalake # 应用jar包 diff --git a/service-cli/service-cli-runner/src/main/resources/application-b5.yml b/service-cli/service-cli-runner/src/main/resources/application-b5.yml index 10d6e91..134018b 100644 --- a/service-cli/service-cli-runner/src/main/resources/application-b5.yml +++ b/service-cli/service-cli-runner/src/main/resources/application-b5.yml @@ -1,5 +1,6 @@ deploy: runtime: + signature: b5 user: datalake jar-path: /apps/datalake/hudi/jars jdk-path: /opt/jdk1.8.0_162/bin/java diff --git a/service-cli/service-cli-runner/src/main/resources/template/cloud/deploy.ftl b/service-cli/service-cli-runner/src/main/resources/template/cloud/deploy.ftl index 0a2e6f9..4047886 100644 --- a/service-cli/service-cli-runner/src/main/resources/template/cloud/deploy.ftl +++ b/service-cli/service-cli-runner/src/main/resources/template/cloud/deploy.ftl @@ -13,7 +13,7 @@ hostname_full=`ssh $host 'hostname -f'` <#-- 获取当前时间 --> start_time=`date +%Y%m%d%H%M%S` ssh $host "mkdir -p ${runtime.jarPath};curl ${runtime.downloadUrl}/${info.sourceJar} -o ${runtime.jarPath}/${info.name}.jar" -ssh $host "export JASYPT_ENCRYPTOR_PASSWORD='r#(R,P\"Dp^A47>WSn:Wn].gs/+\"v:q_Q*An~zF*g-@j@jtSTv5H/,S-3:R?r9R}.';nohup ${runtime.jdkPath} <#list environments?keys as key>-D${key}=${environments[key]?string} -jar ${runtime.jarPath}/${info.name}.jar <#noparse>--deploy.datetime=${datetime} --deploy.ip=${host} --deploy.hostname=${hostname} --deploy.hostname-full=${hostname_full} --deploy.start-time=${start_time} --logging.parent=${runtime.logPath} --loki.url=${runtime.loki.servicePushUrl} --spring.cloud.zookeeper.connect-string=${runtime.zkUrl} --spring.security.meta.authority='${runtime.security.authority}' --spring.security.meta.username='${runtime.security.username}' --spring.security.meta.darkcode='${runtime.security.darkcode}' --yarn-cluster.sync-clusters=${runtime.yarn.syncClusters} --yarn-cluster.compaction-clusters=${runtime.yarn.compactionClusters} <#list arguments?keys as key>--${key}=${arguments[key]?string} > /dev/null 2>&1 &" +ssh $host "export JASYPT_ENCRYPTOR_PASSWORD='r#(R,P\"Dp^A47>WSn:Wn].gs/+\"v:q_Q*An~zF*g-@j@jtSTv5H/,S-3:R?r9R}.';nohup ${runtime.jdkPath} <#list environments?keys as key>-D${key}=${environments[key]?string} -jar ${runtime.jarPath}/${info.name}.jar <#noparse>--deploy.datetime=${datetime} --deploy.ip=${host} --deploy.hostname=${hostname} --deploy.hostname-full=${hostname_full} --deploy.start-time=${start_time} --hudi-service.signature=${runtime.signature} --logging.parent=${runtime.logPath} --loki.url=${runtime.loki.servicePushUrl} --spring.cloud.zookeeper.connect-string=${runtime.zkUrl} --spring.security.meta.authority='${runtime.security.authority}' --spring.security.meta.username='${runtime.security.username}' --spring.security.meta.darkcode='${runtime.security.darkcode}' --yarn-cluster.sync-clusters=${runtime.yarn.syncClusters} --yarn-cluster.compaction-clusters=${runtime.yarn.compactionClusters} <#list arguments?keys as key>--${key}=${arguments[key]?string} > /dev/null 2>&1 &" echo '' diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index f2701c2..c43de53 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -60,6 +60,7 @@ public interface Constants { String INSTANTS = "instants"; String BETA = "beta"; String CLUSTER = "cluster"; + String SIGNATURE = "signature"; String COW = "COPY_ON_WRITE"; String MOR = "MERGE_ON_READ"; diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/RunMeta.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/RunMeta.java index 273ead9..5c827cc 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/RunMeta.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/entity/RunMeta.java @@ -37,16 +37,16 @@ public class RunMeta implements Serializable { this.applicationProxy = System.getenv("APPLICATION_WEB_PROXY_BASE"); } - public RunMeta(String cluster, Long flinkJobId) { + public RunMeta(String signature, String cluster, Long flinkJobId) { this(); this.cluster = cluster; this.flinkJobId = flinkJobId; } - public RunMeta(String cluster, Long flinkJobId, String alias) { - this(cluster, flinkJobId); + public RunMeta(String signature, String cluster, Long flinkJobId, String alias) { + this(signature, cluster, flinkJobId); this.alias = alias; - this.subscriptionName = NameHelper.pulsarSubscriptionName(flinkJobId, alias); + this.subscriptionName = NameHelper.pulsarSubscriptionName(flinkJobId, alias, signature); } public String getCluster() { diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/NameHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/NameHelper.java index 31db70a..64b676c 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/NameHelper.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/NameHelper.java @@ -10,8 +10,8 @@ import com.lanyuanxiaoyao.service.common.Constants; * @date 2022-06-24 */ public class NameHelper { - public static String pulsarSubscriptionName(Long flinkJobId, String alias) { - return Constants.PULSAR_SUBSCRIPTION_NAME_PREFIX + "_" + flinkJobId + "_" + alias + "_20230425"; + public static String pulsarSubscriptionName(Long flinkJobId, String alias, String suffix) { + return Constants.PULSAR_SUBSCRIPTION_NAME_PREFIX + "_" + flinkJobId + "_" + alias + "_" + suffix; } // Sync job name diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/HudiServiceProperties.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/HudiServiceProperties.java new file mode 100644 index 0000000..3bf88e9 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/HudiServiceProperties.java @@ -0,0 +1,31 @@ +package com.lanyuanxiaoyao.service.configuration; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * 配置标志 + * + * @author lanyuanxiaoyao + * @date 2024-03-19 + */ +@Component +@ConfigurationProperties("hudi-service") +public class HudiServiceProperties { + private String signature; + + public String getSignature() { + return signature; + } + + public void setSignature(String signature) { + this.signature = signature; + } + + @Override + public String toString() { + return "HudiServiceProperties{" + + "signature='" + signature + '\'' + + '}'; + } +} diff --git a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java index fa71a0c..fdc277c 100644 --- a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java +++ b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java @@ -3,11 +3,11 @@ package com.lanyuanxiaoyao.service.monitor.metric; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.utils.NameHelper; +import com.lanyuanxiaoyao.service.configuration.HudiServiceProperties; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.forest.service.PulsarService; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Maps; @@ -33,13 +33,15 @@ public class PulsarMetrics extends Metrics { private final MeterRegistry registry; private final InfoService infoService; private final PulsarService pulsarService; + private final HudiServiceProperties hudiServiceProperties; private final MutableMap backlogMap; - public PulsarMetrics(MeterRegistry registry, InfoService infoService, PulsarService pulsarService) { + public PulsarMetrics(MeterRegistry registry, InfoService infoService, PulsarService pulsarService, HudiServiceProperties hudiServiceProperties) { this.registry = registry; this.infoService = infoService; this.pulsarService = pulsarService; + this.hudiServiceProperties = hudiServiceProperties; backlogMap = Maps.mutable.empty(); } @@ -67,7 +69,7 @@ public class PulsarMetrics extends Metrics { ); String name = pulsarService.name(meta.getPulsarAddress()); if (StrUtil.isNotBlank(name)) { - Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias())); + Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias(), hudiServiceProperties.getSignature())); backlogCache.set(backlog); } } catch (Exception ignored) { diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java index fcabc1c..bd3a092 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java @@ -70,15 +70,17 @@ public class Compactor { TableMeta tableMeta = ArgumentsUtils.getTableMeta(args); String selectedInstants = ArgumentsUtils.getInstants(args); String cluster = ArgumentsUtils.getCluster(args); + String signature = ArgumentsUtils.getSignature(args); logger.info("Bootstrap flink job: {}", mapper.writeValueAsString(flinkJob)); logger.info("Bootstrap table meta: {}", mapper.writeValueAsString(tableMeta)); logger.info("Bootstrap instants: {}", selectedInstants); logger.info("Bootstrap cluster: {}", cluster); + logger.info("Bootstrap signature: {}", signature); String applicationId = System.getenv("_APP_ID"); - RunMeta runMeta = new RunMeta(cluster, flinkJob.getId(), tableMeta.getAlias()); + RunMeta runMeta = new RunMeta(signature, cluster, flinkJob.getId(), tableMeta.getAlias()); logger.info("Run meta: {}", runMeta); ZkUtils.createCompactionLock(flinkJob, tableMeta, tableMeta.getConfig().getZookeeperUrl(), mapper.writeValueAsString(runMeta)); logger.info("Lock for {} {} success", flinkJob.getId(), tableMeta.getAlias()); diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java index c5acfac..713ae8f 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java @@ -59,20 +59,22 @@ public class Synchronizer { FlinkJob flinkJob = ArgumentsUtils.getFlinkJob(args); List tableMetaList = ArgumentsUtils.getTableMetaList(args); String cluster = ArgumentsUtils.getCluster(args); + String signature = ArgumentsUtils.getSignature(args); logger.info("Bootstrap flink job: {}", mapper.writeValueAsString(flinkJob)); logger.info("Bootstrap table meta list: {}", mapper.writeValueAsString(tableMetaList)); logger.info("Bootstrap cluster: {}", cluster); + logger.info("Bootstrap signature: {}", signature); String applicationId = System.getenv("_APP_ID"); String zkUrl = findConfigFromList(tableMetaList, meta -> meta.getConfig().getZookeeperUrl(), ZookeeperUrlNotFoundException::new); for (TableMeta tableMeta : tableMetaList) { - RunMeta runMeta = new RunMeta(cluster, flinkJob.getId(), tableMeta.getAlias()); + RunMeta runMeta = new RunMeta(signature, cluster, flinkJob.getId(), tableMeta.getAlias()); logger.info("Run meta: {}", runMeta); ZkUtils.createSynchronizerLock(flinkJob, tableMeta, zkUrl, mapper.writeValueAsString(runMeta)); } - RunMeta runMeta = new RunMeta(cluster, flinkJob.getId()); + RunMeta runMeta = new RunMeta(signature, cluster, flinkJob.getId()); logger.info("Run meta: {}", runMeta); ZkUtils.createSynchronizerLock(flinkJob, zkUrl, mapper.writeValueAsString(runMeta)); logger.info("Lock for {} success", flinkJob.getId()); diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/GlobalConfiguration.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/GlobalConfiguration.java index 3219277..c13c576 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/GlobalConfiguration.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/GlobalConfiguration.java @@ -10,6 +10,7 @@ import java.io.Serializable; * @date 2022-06-13 */ public class GlobalConfiguration implements Serializable { + private final String signature; private final String cluster; private final String applicationId; private final Boolean metricEnable = false; @@ -21,7 +22,8 @@ public class GlobalConfiguration implements Serializable { private final Integer metricPublishTimeout; private final Integer metricPublishBatch; - public GlobalConfiguration(String cluster, String applicationId, TableMeta meta) { + public GlobalConfiguration(String signature, String cluster, String applicationId, TableMeta meta) { + this.signature = signature; this.cluster = cluster; this.applicationId = applicationId; this.metricPublishUrl = meta.getConfig().getMetricPublishUrl(); @@ -33,6 +35,10 @@ public class GlobalConfiguration implements Serializable { this.metricPublishBatch = meta.getConfig().getMetricPublishBatch(); } + public String getSignature() { + return signature; + } + public String getCluster() { return cluster; } @@ -77,7 +83,8 @@ public class GlobalConfiguration implements Serializable { @Override public String toString() { return "GlobalConfiguration{" + - "cluster='" + cluster + '\'' + + "signature='" + signature + '\'' + + ", cluster='" + cluster + '\'' + ", applicationId='" + applicationId + '\'' + ", metricEnable=" + metricEnable + ", metricPublishUrl='" + metricPublishUrl + '\'' + diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java index 98b32d4..accd769 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java @@ -175,7 +175,7 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction