diff --git a/bin/build-executor-manager.sh b/bin/build-executor-manager.sh
new file mode 100755
index 0000000..9f9f689
--- /dev/null
+++ b/bin/build-executor-manager.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+mvn -pl service-dependencies,service-configuration,service-forest,service-executor,service-executor/service-executor-core clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml
+mvn -pl service-executor/service-executor-manager clean package -D skipTests -s ~/.m2/settings-development.xml -P b2e1
+ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-executor/service-executor-manager/target/service-executor-manager-1.0.0-SNAPSHOT.jar
\ No newline at end of file
diff --git a/bin/build-executor-task.sh b/bin/build-executor-task.sh
new file mode 100755
index 0000000..e1f61de
--- /dev/null
+++ b/bin/build-executor-task.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+mvn -pl service-dependencies,service-configuration,service-forest,service-executor,service-executor/service-executor-core clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml
+mvn -pl service-executor/service-executor-task clean package -D skipTests -s ~/.m2/settings-development.xml
+ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-executor/service-executor-task/target/service-executor-task-1.0.0-SNAPSHOT.jar
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 44274bb..f32c340 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,6 +22,7 @@
service-cli
service-loki-query
service-test-query
+ service-executor
@@ -33,6 +34,10 @@
2.6.8
2021.0.3
+ 1.13
+ 2.11
+ 0.12.0-eshore-SNAPSHOT
+ 1.13.3
diff --git a/service-cli/service-cli-runner/src/main/resources/application.yml b/service-cli/service-cli-runner/src/main/resources/application.yml
index 79d86f7..4c7a35e 100644
--- a/service-cli/service-cli-runner/src/main/resources/application.yml
+++ b/service-cli/service-cli-runner/src/main/resources/application.yml
@@ -113,3 +113,6 @@ deploy:
- name: service-flink-query
source-jar: service-flink-query-1.0.0-SNAPSHOT.jar
replicas: 4
+ - name: service-executor-manager
+ source-jar: service-executor-manager-1.0.0-SNAPSHOT.jar
+ replicas: 1
diff --git a/service-configuration/src/test/java/com/lanyuanxiaoyao/service/configuration/TestSecurityDecrypt.java b/service-configuration/src/test/java/com/lanyuanxiaoyao/service/configuration/TestSecurityDecrypt.java
new file mode 100644
index 0000000..3dda84b
--- /dev/null
+++ b/service-configuration/src/test/java/com/lanyuanxiaoyao/service/configuration/TestSecurityDecrypt.java
@@ -0,0 +1,26 @@
+package com.lanyuanxiaoyao.service.configuration;
+
+import org.jasypt.encryption.pbe.PooledPBEStringEncryptor;
+import org.jasypt.encryption.pbe.config.SimpleStringPBEConfig;
+
+/**
+ * @author lanyuanxiaoyao
+ * @date 2023-12-05
+ */
+public class TestSecurityDecrypt {
+ public static void main(String[] args) {
+ PooledPBEStringEncryptor encryptor = new PooledPBEStringEncryptor();
+ SimpleStringPBEConfig config = new SimpleStringPBEConfig();
+ config.setPassword("r#(R,P\"Dp^A47>WSn:Wn].gs/+\"v:q_Q*An~zF*g-@j@jtSTv5H/,S-3:R?r9R}.");
+ config.setAlgorithm("PBEWITHHMACSHA512ANDAES_256");
+ config.setKeyObtentionIterations("1000");
+ config.setPoolSize("1");
+ config.setProviderName("SunJCE");
+ config.setSaltGeneratorClassName("org.jasypt.salt.RandomSaltGenerator");
+ config.setIvGeneratorClassName("org.jasypt.iv.RandomIvGenerator");
+ config.setStringOutputType("base64");
+ encryptor.setConfig(config);
+
+ System.out.println(encryptor.decrypt("GXKnbq1LS11U2HaONspvH+D/TkIx13aWTaokdkzaF7HSvq6Z0Rv1+JUWFnYopVXu"));
+ }
+}
diff --git a/service-executor/pom.xml b/service-executor/pom.xml
new file mode 100644
index 0000000..895214b
--- /dev/null
+++ b/service-executor/pom.xml
@@ -0,0 +1,75 @@
+
+
+ 4.0.0
+
+ com.lanyuanxiaoyao
+ hudi-service
+ 1.0.0-SNAPSHOT
+
+
+ service-executor
+ pom
+
+ service-executor-core
+ service-executor-manager
+ service-executor-task
+
+
+
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.major.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-clients_${scala.major.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-runtime-blink_${scala.major.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-planner-blink_${scala.major.version}
+ ${flink.version}
+
+
+ org.apache.pulsar
+ pulsar-client
+ ${pulsar.version}
+
+
+ org.apache.pulsar
+ pulsar-client-admin
+ ${pulsar.version}
+
+
+ org.apache.hudi
+ hudi-flink${flink.major.version}-bundle
+ ${hudi.version}
+
+
+ org.apache.flink
+ flink-metrics-prometheus_${scala.major.version}
+ ${flink.version}
+
+
+
+
+
\ No newline at end of file
diff --git a/service-executor/service-executor-core/pom.xml b/service-executor/service-executor-core/pom.xml
new file mode 100644
index 0000000..5002e85
--- /dev/null
+++ b/service-executor/service-executor-core/pom.xml
@@ -0,0 +1,14 @@
+
+
+ 4.0.0
+
+ com.lanyuanxiaoyao
+ service-executor
+ 1.0.0-SNAPSHOT
+
+
+ service-executor-core
+
+
\ No newline at end of file
diff --git a/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/Task.java b/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/Task.java
new file mode 100644
index 0000000..15b36dd
--- /dev/null
+++ b/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/Task.java
@@ -0,0 +1,12 @@
+package com.lanyuanxiaoyao.service.executor.core;
+
+import java.io.Serializable;
+
+/**
+ * flink 执行任务信息
+ *
+ * @author lanyuanxiaoyao
+ * @date 2023-12-04
+ */
+public class Task implements Serializable {
+}
diff --git a/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskConstants.java b/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskConstants.java
new file mode 100644
index 0000000..adf8c9c
--- /dev/null
+++ b/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskConstants.java
@@ -0,0 +1,12 @@
+package com.lanyuanxiaoyao.service.executor.core;
+
+/**
+ * 一些字符串
+ *
+ * @author lanyuanxiaoyao
+ * @date 2023-12-04
+ */
+public interface TaskConstants {
+ String TASK_CONTEXT = "task-context";
+ String TASK_CONTEXT_OPTION = "-" + TASK_CONTEXT;
+}
diff --git a/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskContext.java b/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskContext.java
new file mode 100644
index 0000000..6a6be07
--- /dev/null
+++ b/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskContext.java
@@ -0,0 +1,32 @@
+package com.lanyuanxiaoyao.service.executor.core;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * flink 任务信息上下文
+ *
+ * @author lanyuanxiaoyao
+ * @date 2023-12-04
+ */
+public class TaskContext implements Serializable {
+ private Map metadata;
+
+ public TaskContext() {
+ }
+
+ public TaskContext(Map metadata) {
+ this.metadata = metadata;
+ }
+
+ public Map getMetadata() {
+ return metadata;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskContext{" +
+ "metadata=" + metadata +
+ '}';
+ }
+}
diff --git a/service-executor/service-executor-manager/pom.xml b/service-executor/service-executor-manager/pom.xml
new file mode 100644
index 0000000..223e60a
--- /dev/null
+++ b/service-executor/service-executor-manager/pom.xml
@@ -0,0 +1,161 @@
+
+
+ 4.0.0
+
+ com.lanyuanxiaoyao
+ service-executor
+ 1.0.0-SNAPSHOT
+
+
+ service-executor-manager
+
+
+
+ com.lanyuanxiaoyao
+ service-executor-core
+ 1.0.0-SNAPSHOT
+
+
+ com.lanyuanxiaoyao
+ service-dependencies
+ 1.0.0-SNAPSHOT
+
+
+ com.google.guava
+ guava
+
+
+
+
+ com.lanyuanxiaoyao
+ service-configuration
+ 1.0.0-SNAPSHOT
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+ org.apache.logging.log4j
+ log4j-to-slf4j
+
+
+
+
+ com.sun.jersey
+ jersey-client
+ 1.19.4
+
+
+ com.sun.jersey
+ jersey-core
+ 1.19.4
+
+
+ com.sun.jersey.contribs
+ jersey-apache-client4
+ 1.19.4
+
+
+ org.apache.hudi
+ hudi-flink${flink.major.version}-bundle
+ ${hudi.version}
+
+
+ com.eshore.odcp.hudi.connector
+ executor
+ 1.0.0-SNAPSHOT
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+ 3.2.0
+
+
+ copy-config-file
+ validate
+
+ copy-resources
+
+
+ ${project.build.directory}/classes
+
+
+ ${project.parent.parent.basedir}/config/${build-tag}
+
+ *.xml
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.3.0
+
+ false
+ true
+
+
+ META-INF/spring.handlers
+
+
+ META-INF/spring.factories
+
+
+ META-INF/spring.schemas
+
+
+ com.lanyuanxiaoyao.service.executor.manager.ExecutorManagerApplication
+
+
+ reference.conf
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ log4j-surefire*.properties
+
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot.version}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/ExecutorManagerApplication.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/ExecutorManagerApplication.java
new file mode 100644
index 0000000..de1918e
--- /dev/null
+++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/ExecutorManagerApplication.java
@@ -0,0 +1,97 @@
+package com.lanyuanxiaoyao.service.executor.manager;
+
+import com.eshore.odcp.hudi.connector.utils.executor.Runner;
+import com.lanyuanxiaoyao.service.executor.core.TaskConstants;
+import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties;
+import java.time.Duration;
+import java.util.ArrayList;
+import org.apache.flink.client.cli.ClientOptions;
+import org.apache.flink.configuration.*;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
+import org.springframework.retry.annotation.EnableRetry;
+
+import static com.eshore.odcp.hudi.connector.Constants.HALF_HOUR;
+import static com.eshore.odcp.hudi.connector.Constants.MINUTE;
+
+/**
+ * @author lanyuanxiaoyao
+ * @date 2023-12-04
+ */
+@EnableDiscoveryClient
+@SpringBootApplication(
+ scanBasePackages = {"com.lanyuanxiaoyao.service"},
+ exclude = {
+ GsonAutoConfiguration.class,
+ DataSourceAutoConfiguration.class
+ }
+)
+@EnableConfigurationProperties
+@EnableEncryptableProperties
+@EnableRetry
+public class ExecutorManagerApplication implements ApplicationRunner {
+ public static void main(String[] args) {
+ SpringApplication.run(ExecutorManagerApplication.class, args);
+ }
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.setBoolean(JobManagerOptions.JVM_DIRECT_MEMORY_LIMIT_ENABLED, true);
+ configuration.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
+ configuration.setString(AkkaOptions.TCP_TIMEOUT, "15 min");
+ configuration.setString(AkkaOptions.LOOKUP_TIMEOUT, "10 min");
+ configuration.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(30));
+ // Kerberos认证
+ configuration.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, true);
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, "/etc/security/keytabs/datalake.app.keytab");
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, "datalake/b5s119.hdp.dc@ECLD.COM");
+ configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, MINUTE);
+ configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, HALF_HOUR);
+ configuration.setString(AkkaOptions.ASK_TIMEOUT, "1 min");
+ configuration.setString(AkkaOptions.TCP_TIMEOUT, "2 min");
+ configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+ configuration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "4");
+ configuration.setString(YarnConfigOptions.STAGING_DIRECTORY, "hdfs://b2/apps/datalake/yarn");
+ configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
+ configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
+ configuration.setInteger(RestOptions.PORT, 8081);
+ configuration.setString(RestOptions.BIND_PORT, "8084-9400");
+ configuration.setString(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
+ configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("5120m"));
+ configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.parse("128m"));
+ configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("1024m"));
+ configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("0m"));
+ configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.parse("128m"));
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10);
+ configuration.setString(YarnConfigOptions.APPLICATION_NAME, "HudiService_faee2e95-660d-4b1c-9cec-13473b3cd5b7");
+ configuration.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, "hdfs://b2/apps/flink/completed-jobs/");
+ configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000);
+
+ configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ configuration.setString(YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, "/app-logs");
+
+ configuration.set(PipelineOptions.JARS, new ArrayList() {{
+ add("hdfs://b2/apps/datalake/jars/service/service-executor-task.jar");
+ }});
+ ApplicationId applicationId = Runner.run(
+ configuration,
+ "com.lanyuanxiaoyao.service.executor.task.Hello",
+ new String[]{
+ TaskConstants.TASK_CONTEXT_OPTION,
+ "{\"metadata\": {\"id\":\"faee2e95-660d-4b1c-9cec-13473b3cd5b7\"}}"
+ }
+ );
+ System.out.println(applicationId);
+ }
+}
diff --git a/service-executor/service-executor-manager/src/main/resources/application.yml b/service-executor/service-executor-manager/src/main/resources/application.yml
new file mode 100644
index 0000000..bd00df3
--- /dev/null
+++ b/service-executor/service-executor-manager/src/main/resources/application.yml
@@ -0,0 +1,5 @@
+spring:
+ application:
+ name: service-executor-manager
+ profiles:
+ include: random-port,common,discovery,metrics
\ No newline at end of file
diff --git a/service-executor/service-executor-manager/src/main/resources/logback-spring.xml b/service-executor/service-executor-manager/src/main/resources/logback-spring.xml
new file mode 100644
index 0000000..f955921
--- /dev/null
+++ b/service-executor/service-executor-manager/src/main/resources/logback-spring.xml
@@ -0,0 +1,51 @@
+
+
+
+
+
+
+
+
+
+
+ true
+
+ ${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push}
+
+
+
+
+ ${FILE_LOG_PATTERN:-%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} [${HOSTNAME}] ${LOG_LEVEL_PATTERN:-%5p} ${PID:- } --- [%t] %-40.40logger{39} #@# : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}
+
+ true
+
+
+
+
+
+ ${CONSOLE_LOG_PATTERN:-%clr(%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}
+
+
+
+
+ ${LOGGING_PARENT:-.}/${APP_NAME:-run}.log
+
+ ${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz
+ 7
+
+
+ ${FILE_LOG_PATTERN:-%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} [${HOSTNAME}] ${LOG_LEVEL_PATTERN:-%5p} ${PID:- } --- [%t] %-40.40logger{39} #@# : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/service-executor/service-executor-task/pom.xml b/service-executor/service-executor-task/pom.xml
new file mode 100644
index 0000000..4c5f793
--- /dev/null
+++ b/service-executor/service-executor-task/pom.xml
@@ -0,0 +1,110 @@
+
+
+ 4.0.0
+
+ com.lanyuanxiaoyao
+ service-executor
+ 1.0.0-SNAPSHOT
+
+
+ service-executor-task
+
+
+
+ com.lanyuanxiaoyao
+ service-executor-core
+ 1.0.0-SNAPSHOT
+ provided
+
+
+ com.lanyuanxiaoyao
+ service-configuration
+ 1.0.0-SNAPSHOT
+ provided
+
+
+ org.apache.hudi
+ hudi-flink${flink.major.version}-bundle
+ provided
+
+
+ org.apache.flink
+ flink-java
+ provided
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.major.version}
+ provided
+
+
+ org.apache.flink
+ flink-clients_${scala.major.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-common
+ provided
+
+
+ org.apache.flink
+ flink-table-runtime-blink_${scala.major.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-planner-blink_${scala.major.version}
+ provided
+
+
+ org.apache.flink
+ flink-shaded-guava
+ 30.1.1-jre-15.0
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.3.0
+
+ false
+ true
+
+
+ reference.conf
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ log4j-surefire*.properties
+
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/Hello.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/Hello.java
new file mode 100644
index 0000000..4423698
--- /dev/null
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/Hello.java
@@ -0,0 +1,50 @@
+package com.lanyuanxiaoyao.service.executor.task;
+
+import com.lanyuanxiaoyao.service.executor.core.TaskContext;
+import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Hello world
+ *
+ * @author lanyuanxiaoyao
+ * @date 2023-12-04
+ */
+public class Hello {
+ private static final Logger logger = LoggerFactory.getLogger(Hello.class);
+
+ public static void main(String[] args) throws Exception {
+ TaskContext taskContext = ArgumentsHelper.getContext(args);
+ logger.info("Context: {}", taskContext);
+
+ StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ environment
+ .addSource(new SourceFunction() {
+ @Override
+ public void run(SourceContext context) {
+ for (int index = 0; index < 10; index++) {
+ context.collect(index);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ })
+ .map(value -> "Index: " + value)
+ .addSink(new SinkFunction() {
+ @Override
+ public void invoke(String value, Context context) throws Exception {
+ logger.info("Value: {}", value);
+ }
+ });
+
+
+ environment.execute("Service task: Hello");
+ }
+}
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/ArgumentsHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/ArgumentsHelper.java
new file mode 100644
index 0000000..6505ea9
--- /dev/null
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/ArgumentsHelper.java
@@ -0,0 +1,23 @@
+package com.lanyuanxiaoyao.service.executor.task.helper;
+
+import cn.hutool.core.util.StrUtil;
+import com.lanyuanxiaoyao.service.executor.core.TaskConstants;
+import com.lanyuanxiaoyao.service.executor.core.TaskContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+
+/**
+ * 入参解析相关内容
+ *
+ * @author ZhangJiacheng
+ * @date 2022-03-10
+ */
+public class ArgumentsHelper {
+ public static TaskContext getContext(String[] args) throws JsonProcessingException {
+ ParameterTool argsTool = ParameterTool.fromArgs(args);
+ if (!argsTool.has(TaskConstants.TASK_CONTEXT)) {
+ throw new RuntimeException(StrUtil.format("Miss argument: {}", TaskConstants.TASK_CONTEXT));
+ }
+ return JacksonHelper.getMapper().readValue(argsTool.get(TaskConstants.TASK_CONTEXT), TaskContext.class);
+ }
+}
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/JacksonHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/JacksonHelper.java
new file mode 100644
index 0000000..5325baa
--- /dev/null
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/JacksonHelper.java
@@ -0,0 +1,31 @@
+package com.lanyuanxiaoyao.service.executor.task.helper;
+
+import cn.hutool.core.util.ObjectUtil;
+import java.io.Serializable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Json 解析相关工具
+ *
+ * @author ZhangJiacheng
+ * @date 2022-06-12
+ */
+public class JacksonHelper implements Serializable {
+ private static final Logger logger = LoggerFactory.getLogger(JacksonHelper.class);
+
+ private static ObjectMapper INSTANCE = null;
+
+ public static ObjectMapper getMapper() {
+ if (ObjectUtil.isNull(INSTANCE)) {
+ INSTANCE = new ObjectMapper();
+ INSTANCE.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
+ INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ INSTANCE.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
+ }
+ return INSTANCE;
+ }
+}