feature(executor): 增加Flink集群任务模块

新增 executor-manager 和 executor-task
manager管理任务信息启停等,task执行业务方法
This commit is contained in:
2023-12-05 14:53:15 +08:00
parent 5450559470
commit 62bfc08fc3
18 changed files with 715 additions and 0 deletions

View File

@@ -0,0 +1,50 @@
package com.lanyuanxiaoyao.service.executor.task;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hello world
*
* @author lanyuanxiaoyao
* @date 2023-12-04
*/
public class Hello {
private static final Logger logger = LoggerFactory.getLogger(Hello.class);
public static void main(String[] args) throws Exception {
TaskContext taskContext = ArgumentsHelper.getContext(args);
logger.info("Context: {}", taskContext);
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment
.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> context) {
for (int index = 0; index < 10; index++) {
context.collect(index);
}
}
@Override
public void cancel() {
}
})
.map(value -> "Index: " + value)
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
logger.info("Value: {}", value);
}
});
environment.execute("Service task: Hello");
}
}

View File

@@ -0,0 +1,23 @@
package com.lanyuanxiaoyao.service.executor.task.helper;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.executor.core.TaskConstants;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
/**
* 入参解析相关内容
*
* @author ZhangJiacheng
* @date 2022-03-10
*/
public class ArgumentsHelper {
public static TaskContext getContext(String[] args) throws JsonProcessingException {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(TaskConstants.TASK_CONTEXT)) {
throw new RuntimeException(StrUtil.format("Miss argument: {}", TaskConstants.TASK_CONTEXT));
}
return JacksonHelper.getMapper().readValue(argsTool.get(TaskConstants.TASK_CONTEXT), TaskContext.class);
}
}

View File

@@ -0,0 +1,31 @@
package com.lanyuanxiaoyao.service.executor.task.helper;
import cn.hutool.core.util.ObjectUtil;
import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Json 解析相关工具
*
* @author ZhangJiacheng
* @date 2022-06-12
*/
public class JacksonHelper implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(JacksonHelper.class);
private static ObjectMapper INSTANCE = null;
public static ObjectMapper getMapper() {
if (ObjectUtil.isNull(INSTANCE)) {
INSTANCE = new ObjectMapper();
INSTANCE.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
INSTANCE.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
}
return INSTANCE;
}
}