diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index c43de53..37c3a16 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -71,6 +71,7 @@ public interface Constants { String INSTANTS_OPTION = "-" + INSTANTS; String BETA_OPTION = "-" + BETA; String CLUSTER_OPTION = "-" + CLUSTER; + String SIGNATURE_OPTION = "-" + SIGNATURE; String SPRING_SECURITY_AUTHORITY = "Anonymous"; String SPRING_SECURITY_USERNAME = "AxhEbscwsJDbYMH2"; @@ -117,6 +118,7 @@ public interface Constants { String METRICS_LABEL_RUN_TYPE = "run_type"; String METRICS_LABEL_EXECUTOR_VERSION = "executor_version"; String METRICS_LABEL_CLUSTER = "cluster"; + String METRICS_LABEL_SIGNATURE = "signature"; String METRICS_RUN_TYPE_SYNC = "sync"; String METRICS_RUN_TYPE_COMPACTION = "compaction"; diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java index 96f2934..a1c0fad 100644 --- a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/ExecutorService.java @@ -9,6 +9,7 @@ import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.exception.CheckpointRootPathNotFoundException; import com.lanyuanxiaoyao.service.common.utils.NameHelper; +import com.lanyuanxiaoyao.service.configuration.HudiServiceProperties; import com.lanyuanxiaoyao.service.executor.Runner; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration; @@ -21,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.regex.Pattern; -import javax.annotation.Resource; import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.configuration.*; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; @@ -53,14 +53,20 @@ public class ExecutorService { private static final Logger logger = LoggerFactory.getLogger(ExecutorService.class); private static final ObjectMapper MAPPER = JacksonUtil.getMapper(); private static final Pattern EXECUTOR_JAR_NAME = Pattern.compile(".+sync-(\\d+)\\.jar"); - @Resource - private InfoService infoService; - @Resource - private HadoopConfiguration hadoopConfiguration; - @Resource - private HudiConfiguration hudiConfiguration; + private final InfoService infoService; + private final HadoopConfiguration hadoopConfiguration; + private final HudiConfiguration hudiConfiguration; + private final HudiServiceProperties hudiServiceProperties; - public static String[] syncArgs(FlinkJob flinkJob, ImmutableList tableMetaList, String cluster) throws JsonProcessingException { + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public ExecutorService(InfoService infoService, HadoopConfiguration hadoopConfiguration, HudiConfiguration hudiConfiguration, HudiServiceProperties hudiServiceProperties) { + this.infoService = infoService; + this.hadoopConfiguration = hadoopConfiguration; + this.hudiConfiguration = hudiConfiguration; + this.hudiServiceProperties = hudiServiceProperties; + } + + public static String[] syncArgs(FlinkJob flinkJob, ImmutableList tableMetaList, String cluster, String signature) throws JsonProcessingException { List argsList = Lists.mutable.empty(); argsList.add(Constants.FLINK_JOB_OPTION); argsList.add(MAPPER.writeValueAsString(flinkJob)); @@ -68,10 +74,12 @@ public class ExecutorService { argsList.add(MAPPER.writeValueAsString(tableMetaList)); argsList.add(Constants.CLUSTER_OPTION); argsList.add(cluster); + argsList.add(Constants.SIGNATURE_OPTION); + argsList.add(signature); return argsList.toArray(new String[]{}); } - public static String[] compactionArgs(FlinkJob flinkJob, TableMeta tableMeta, String instants, String cluster) throws JsonProcessingException { + public static String[] compactionArgs(FlinkJob flinkJob, TableMeta tableMeta, String instants, String cluster, String signature) throws JsonProcessingException { List argsList = Lists.mutable.empty(); argsList.add(Constants.FLINK_JOB_OPTION); argsList.add(MAPPER.writeValueAsString(flinkJob)); @@ -83,6 +91,8 @@ public class ExecutorService { } argsList.add(Constants.CLUSTER_OPTION); argsList.add(cluster); + argsList.add(Constants.SIGNATURE_OPTION); + argsList.add(signature); return argsList.toArray(new String[]{}); } @@ -267,6 +277,7 @@ public class ExecutorService { setEnvironment(configuration, Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJobName); setEnvironment(configuration, Constants.METRICS_LABEL_EXECUTOR_VERSION, String.valueOf(executorJarVersion)); setEnvironment(configuration, Constants.METRICS_LABEL_CLUSTER, cluster); + setEnvironment(configuration, Constants.METRICS_LABEL_SIGNATURE, hudiServiceProperties.getSignature()); setEnvironment(configuration, Constants.LOKI_PUSH_URL, hudiConfiguration.getLokiPushUrl()); @@ -280,7 +291,7 @@ public class ExecutorService { return Runner.run( configuration, "com.lanyuanxiaoyao.service.sync.Synchronizer", - syncArgs(flinkJob, tableMetaList, cluster) + syncArgs(flinkJob, tableMetaList, cluster, hudiServiceProperties.getSignature()) ); } @@ -318,6 +329,7 @@ public class ExecutorService { setEnvironment(configuration, Constants.METRICS_LABEL_BATCH_ID, batchCode); setEnvironment(configuration, Constants.METRICS_LABEL_EXECUTOR_VERSION, String.valueOf(executorJarVersion)); setEnvironment(configuration, Constants.METRICS_LABEL_CLUSTER, cluster); + setEnvironment(configuration, Constants.METRICS_LABEL_SIGNATURE, hudiServiceProperties.getSignature()); setEnvironment(configuration, Constants.LOKI_PUSH_URL, hudiConfiguration.getLokiPushUrl()); @@ -335,7 +347,7 @@ public class ExecutorService { return Runner.run( configuration, "com.lanyuanxiaoyao.service.sync.Compactor", - compactionArgs(flinkJob, tableMeta, instants, cluster) + compactionArgs(flinkJob, tableMeta, instants, cluster, hudiServiceProperties.getSignature()) ); }