feat(configuration): 增加signature标志 用于区分不同的服务群

比如生产环境和测试环境需要订阅同一个pulsar topic,同样的订阅名称会产生冲突,需要从集群层面避免冲突,所以增加了一个「签名(signature)」在需要的地方可以用于识别不同的两个集群
This commit is contained in:
v-zhangjc9
2024-03-19 14:30:17 +08:00
parent 4f055bfcf8
commit 5771ec238b
14 changed files with 83 additions and 17 deletions

View File

@@ -70,15 +70,17 @@ public class Compactor {
TableMeta tableMeta = ArgumentsUtils.getTableMeta(args);
String selectedInstants = ArgumentsUtils.getInstants(args);
String cluster = ArgumentsUtils.getCluster(args);
String signature = ArgumentsUtils.getSignature(args);
logger.info("Bootstrap flink job: {}", mapper.writeValueAsString(flinkJob));
logger.info("Bootstrap table meta: {}", mapper.writeValueAsString(tableMeta));
logger.info("Bootstrap instants: {}", selectedInstants);
logger.info("Bootstrap cluster: {}", cluster);
logger.info("Bootstrap signature: {}", signature);
String applicationId = System.getenv("_APP_ID");
RunMeta runMeta = new RunMeta(cluster, flinkJob.getId(), tableMeta.getAlias());
RunMeta runMeta = new RunMeta(signature, cluster, flinkJob.getId(), tableMeta.getAlias());
logger.info("Run meta: {}", runMeta);
ZkUtils.createCompactionLock(flinkJob, tableMeta, tableMeta.getConfig().getZookeeperUrl(), mapper.writeValueAsString(runMeta));
logger.info("Lock for {} {} success", flinkJob.getId(), tableMeta.getAlias());

View File

@@ -59,20 +59,22 @@ public class Synchronizer {
FlinkJob flinkJob = ArgumentsUtils.getFlinkJob(args);
List<TableMeta> tableMetaList = ArgumentsUtils.getTableMetaList(args);
String cluster = ArgumentsUtils.getCluster(args);
String signature = ArgumentsUtils.getSignature(args);
logger.info("Bootstrap flink job: {}", mapper.writeValueAsString(flinkJob));
logger.info("Bootstrap table meta list: {}", mapper.writeValueAsString(tableMetaList));
logger.info("Bootstrap cluster: {}", cluster);
logger.info("Bootstrap signature: {}", signature);
String applicationId = System.getenv("_APP_ID");
String zkUrl = findConfigFromList(tableMetaList, meta -> meta.getConfig().getZookeeperUrl(), ZookeeperUrlNotFoundException::new);
for (TableMeta tableMeta : tableMetaList) {
RunMeta runMeta = new RunMeta(cluster, flinkJob.getId(), tableMeta.getAlias());
RunMeta runMeta = new RunMeta(signature, cluster, flinkJob.getId(), tableMeta.getAlias());
logger.info("Run meta: {}", runMeta);
ZkUtils.createSynchronizerLock(flinkJob, tableMeta, zkUrl, mapper.writeValueAsString(runMeta));
}
RunMeta runMeta = new RunMeta(cluster, flinkJob.getId());
RunMeta runMeta = new RunMeta(signature, cluster, flinkJob.getId());
logger.info("Run meta: {}", runMeta);
ZkUtils.createSynchronizerLock(flinkJob, zkUrl, mapper.writeValueAsString(runMeta));
logger.info("Lock for {} success", flinkJob.getId());

View File

@@ -10,6 +10,7 @@ import java.io.Serializable;
* @date 2022-06-13
*/
public class GlobalConfiguration implements Serializable {
private final String signature;
private final String cluster;
private final String applicationId;
private final Boolean metricEnable = false;
@@ -21,7 +22,8 @@ public class GlobalConfiguration implements Serializable {
private final Integer metricPublishTimeout;
private final Integer metricPublishBatch;
public GlobalConfiguration(String cluster, String applicationId, TableMeta meta) {
public GlobalConfiguration(String signature, String cluster, String applicationId, TableMeta meta) {
this.signature = signature;
this.cluster = cluster;
this.applicationId = applicationId;
this.metricPublishUrl = meta.getConfig().getMetricPublishUrl();
@@ -33,6 +35,10 @@ public class GlobalConfiguration implements Serializable {
this.metricPublishBatch = meta.getConfig().getMetricPublishBatch();
}
public String getSignature() {
return signature;
}
public String getCluster() {
return cluster;
}
@@ -77,7 +83,8 @@ public class GlobalConfiguration implements Serializable {
@Override
public String toString() {
return "GlobalConfiguration{" +
"cluster='" + cluster + '\'' +
"signature='" + signature + '\'' +
", cluster='" + cluster + '\'' +
", applicationId='" + applicationId + '\'' +
", metricEnable=" + metricEnable +
", metricPublishUrl='" + metricPublishUrl + '\'' +

View File

@@ -175,7 +175,7 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
reader = client.newReader(new StringSchema())
.topic(topic)
.receiverQueueSize(10000)
.subscriptionName(NameHelper.pulsarSubscriptionName(flinkJob.getId(), tableMeta.getAlias()))
.subscriptionName(NameHelper.pulsarSubscriptionName(flinkJob.getId(), tableMeta.getAlias(), globalConfiguration.getSignature()))
.startMessageId(lastMessageId.get())
.startMessageIdInclusive()
.create();

View File

@@ -95,4 +95,12 @@ public class ArgumentsUtils {
}
return argsTool.get(Constants.CLUSTER);
}
public static String getSignature(String[] args) {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(Constants.SIGNATURE)) {
return "";
}
return argsTool.get(Constants.SIGNATURE);
}
}