fix(launcher): 修复launcher没有传递signature

This commit is contained in:
v-zhangjc9
2024-03-19 15:06:11 +08:00
parent 5771ec238b
commit 9ecaa73a70
2 changed files with 25 additions and 11 deletions

View File

@@ -71,6 +71,7 @@ public interface Constants {
String INSTANTS_OPTION = "-" + INSTANTS;
String BETA_OPTION = "-" + BETA;
String CLUSTER_OPTION = "-" + CLUSTER;
String SIGNATURE_OPTION = "-" + SIGNATURE;
String SPRING_SECURITY_AUTHORITY = "Anonymous";
String SPRING_SECURITY_USERNAME = "AxhEbscwsJDbYMH2";
@@ -117,6 +118,7 @@ public interface Constants {
String METRICS_LABEL_RUN_TYPE = "run_type";
String METRICS_LABEL_EXECUTOR_VERSION = "executor_version";
String METRICS_LABEL_CLUSTER = "cluster";
String METRICS_LABEL_SIGNATURE = "signature";
String METRICS_RUN_TYPE_SYNC = "sync";
String METRICS_RUN_TYPE_COMPACTION = "compaction";

View File

@@ -9,6 +9,7 @@ import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.exception.CheckpointRootPathNotFoundException;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import com.lanyuanxiaoyao.service.configuration.HudiServiceProperties;
import com.lanyuanxiaoyao.service.executor.Runner;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration;
@@ -21,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import javax.annotation.Resource;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.configuration.*;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -53,14 +53,20 @@ public class ExecutorService {
private static final Logger logger = LoggerFactory.getLogger(ExecutorService.class);
private static final ObjectMapper MAPPER = JacksonUtil.getMapper();
private static final Pattern EXECUTOR_JAR_NAME = Pattern.compile(".+sync-(\\d+)\\.jar");
@Resource
private InfoService infoService;
@Resource
private HadoopConfiguration hadoopConfiguration;
@Resource
private HudiConfiguration hudiConfiguration;
private final InfoService infoService;
private final HadoopConfiguration hadoopConfiguration;
private final HudiConfiguration hudiConfiguration;
private final HudiServiceProperties hudiServiceProperties;
public static String[] syncArgs(FlinkJob flinkJob, ImmutableList<TableMeta> tableMetaList, String cluster) throws JsonProcessingException {
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public ExecutorService(InfoService infoService, HadoopConfiguration hadoopConfiguration, HudiConfiguration hudiConfiguration, HudiServiceProperties hudiServiceProperties) {
this.infoService = infoService;
this.hadoopConfiguration = hadoopConfiguration;
this.hudiConfiguration = hudiConfiguration;
this.hudiServiceProperties = hudiServiceProperties;
}
public static String[] syncArgs(FlinkJob flinkJob, ImmutableList<TableMeta> tableMetaList, String cluster, String signature) throws JsonProcessingException {
List<String> argsList = Lists.mutable.empty();
argsList.add(Constants.FLINK_JOB_OPTION);
argsList.add(MAPPER.writeValueAsString(flinkJob));
@@ -68,10 +74,12 @@ public class ExecutorService {
argsList.add(MAPPER.writeValueAsString(tableMetaList));
argsList.add(Constants.CLUSTER_OPTION);
argsList.add(cluster);
argsList.add(Constants.SIGNATURE_OPTION);
argsList.add(signature);
return argsList.toArray(new String[]{});
}
public static String[] compactionArgs(FlinkJob flinkJob, TableMeta tableMeta, String instants, String cluster) throws JsonProcessingException {
public static String[] compactionArgs(FlinkJob flinkJob, TableMeta tableMeta, String instants, String cluster, String signature) throws JsonProcessingException {
List<String> argsList = Lists.mutable.empty();
argsList.add(Constants.FLINK_JOB_OPTION);
argsList.add(MAPPER.writeValueAsString(flinkJob));
@@ -83,6 +91,8 @@ public class ExecutorService {
}
argsList.add(Constants.CLUSTER_OPTION);
argsList.add(cluster);
argsList.add(Constants.SIGNATURE_OPTION);
argsList.add(signature);
return argsList.toArray(new String[]{});
}
@@ -267,6 +277,7 @@ public class ExecutorService {
setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJobName);
setEnvironment(configuration, Constants.METRICS_LABEL_EXECUTOR_VERSION, String.valueOf(executorJarVersion));
setEnvironment(configuration, Constants.METRICS_LABEL_CLUSTER, cluster);
setEnvironment(configuration, Constants.METRICS_LABEL_SIGNATURE, hudiServiceProperties.getSignature());
setEnvironment(configuration, Constants.LOKI_PUSH_URL, hudiConfiguration.getLokiPushUrl());
@@ -280,7 +291,7 @@ public class ExecutorService {
return Runner.run(
configuration,
"com.lanyuanxiaoyao.service.sync.Synchronizer",
syncArgs(flinkJob, tableMetaList, cluster)
syncArgs(flinkJob, tableMetaList, cluster, hudiServiceProperties.getSignature())
);
}
@@ -318,6 +329,7 @@ public class ExecutorService {
setEnvironment(configuration, Constants.METRICS_LABEL_BATCH_ID, batchCode);
setEnvironment(configuration, Constants.METRICS_LABEL_EXECUTOR_VERSION, String.valueOf(executorJarVersion));
setEnvironment(configuration, Constants.METRICS_LABEL_CLUSTER, cluster);
setEnvironment(configuration, Constants.METRICS_LABEL_SIGNATURE, hudiServiceProperties.getSignature());
setEnvironment(configuration, Constants.LOKI_PUSH_URL, hudiConfiguration.getLokiPushUrl());
@@ -335,7 +347,7 @@ public class ExecutorService {
return Runner.run(
configuration,
"com.lanyuanxiaoyao.service.sync.Compactor",
compactionArgs(flinkJob, tableMeta, instants, cluster)
compactionArgs(flinkJob, tableMeta, instants, cluster, hudiServiceProperties.getSignature())
);
}