diff --git a/service-executor/service-executor-manager/pom.xml b/service-executor/service-executor-manager/pom.xml
index 10d581b..7b1e984 100644
--- a/service-executor/service-executor-manager/pom.xml
+++ b/service-executor/service-executor-manager/pom.xml
@@ -21,12 +21,6 @@
com.lanyuanxiaoyao
service-dependencies
1.0.0-SNAPSHOT
-
-
- com.google.guava
- guava
-
-
com.lanyuanxiaoyao
@@ -68,6 +62,33 @@
executor
1.0.0-SNAPSHOT
+
+ org.apache.hadoop
+ hadoop-client
+ 3.1.2
+
+
+ org.apache.curator
+ curator-client
+
+
+ org.apache.curator
+ curator-framework
+
+
+ org.apache.curator
+ curator-recipes
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.google.guava
+ guava
+
+
+
diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/ExecutorManagerApplication.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/ExecutorManagerApplication.java
index 231c959..29f76e7 100644
--- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/ExecutorManagerApplication.java
+++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/ExecutorManagerApplication.java
@@ -1,36 +1,14 @@
package com.lanyuanxiaoyao.service.executor.manager;
-import cn.hutool.core.util.IdUtil;
-import com.eshore.odcp.hudi.connector.utils.executor.Runner;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.lanyuanxiaoyao.service.executor.core.TaskConstants;
-import com.lanyuanxiaoyao.service.executor.core.TaskContext;
-import com.lanyuanxiaoyao.service.executor.manager.configuration.ExecutorConfiguration;
-import com.lanyuanxiaoyao.service.executor.manager.configuration.HadoopConfiguration;
import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties;
-import java.time.Duration;
-import java.util.ArrayList;
-import org.apache.flink.client.cli.ClientOptions;
-import org.apache.flink.configuration.*;
-import org.apache.flink.yarn.configuration.YarnConfigOptions;
-import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.eclipse.collections.api.factory.Maps;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
-import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.retry.annotation.EnableRetry;
-import static com.eshore.odcp.hudi.connector.Constants.HALF_HOUR;
-import static com.eshore.odcp.hudi.connector.Constants.MINUTE;
-
/**
* @author lanyuanxiaoyao
* @date 2023-12-04
@@ -46,77 +24,8 @@ import static com.eshore.odcp.hudi.connector.Constants.MINUTE;
@EnableConfigurationProperties
@EnableEncryptableProperties
@EnableRetry
-public class ExecutorManagerApplication implements ApplicationRunner {
- private final HadoopConfiguration hadoopConfiguration;
- private final ExecutorConfiguration executorConfiguration;
- private final ObjectMapper mapper;
-
- public ExecutorManagerApplication(HadoopConfiguration hadoopConfiguration, ExecutorConfiguration executorConfiguration, Jackson2ObjectMapperBuilder builder) {
- this.hadoopConfiguration = hadoopConfiguration;
- this.executorConfiguration = executorConfiguration;
- this.mapper = builder.build();
- }
-
+public class ExecutorManagerApplication {
public static void main(String[] args) {
SpringApplication.run(ExecutorManagerApplication.class, args);
}
-
- @Override
- public void run(ApplicationArguments args) throws Exception {
- String taskId = IdUtil.nanoId(8);
-
- Configuration configuration = new Configuration();
- configuration.setBoolean(JobManagerOptions.JVM_DIRECT_MEMORY_LIMIT_ENABLED, true);
- configuration.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
- configuration.setString(AkkaOptions.TCP_TIMEOUT, "15 min");
- configuration.setString(AkkaOptions.LOOKUP_TIMEOUT, "10 min");
- configuration.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(30));
- // Kerberos认证
- configuration.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, true);
- configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, hadoopConfiguration.getKerberosKeytabPath());
- configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, hadoopConfiguration.getKerberosPrincipal());
- configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, MINUTE);
- configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, HALF_HOUR);
- configuration.setString(AkkaOptions.ASK_TIMEOUT, "1 min");
- configuration.setString(AkkaOptions.TCP_TIMEOUT, "2 min");
- configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
- configuration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "4");
- configuration.setString(YarnConfigOptions.STAGING_DIRECTORY, executorConfiguration.getStagingDirectory());
- configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
- configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
- configuration.setInteger(RestOptions.PORT, 8081);
- configuration.setString(RestOptions.BIND_PORT, "8084-9400");
- configuration.setString(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
- configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("5120m"));
- configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.parse("128m"));
- configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("1024m"));
- configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("0m"));
- configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.parse("128m"));
- configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10);
- configuration.setString(YarnConfigOptions.APPLICATION_NAME, "Service_Task_" + taskId);
- configuration.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, executorConfiguration.getHistoryServerArchiveDir());
- configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000);
-
- configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
- configuration.setString(YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, "/app-logs");
-
- configuration.set(PipelineOptions.JARS, new ArrayList() {{
- add(executorConfiguration.getTaskJarPath());
- }});
- ApplicationId applicationId = Runner.run(
- configuration,
- "com.lanyuanxiaoyao.service.executor.task.AvroScanner",
- new String[]{
- TaskConstants.TASK_CONTEXT_OPTION,
- mapper.writeValueAsString(
- new TaskContext(
- taskId,
- executorConfiguration.getTaskResultPath(),
- Maps.mutable.of("key", "123456", "hdfs", "hdfs://b2/apps/datalake/hive/dws_test/external_table_hudi/dws_ord_prod_inst_attr")
- )
- )
- }
- );
- System.out.println(applicationId);
- }
}
diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/TaskController.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/TaskController.java
new file mode 100644
index 0000000..3acdba1
--- /dev/null
+++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/TaskController.java
@@ -0,0 +1,47 @@
+package com.lanyuanxiaoyao.service.executor.manager.controller;
+
+import com.lanyuanxiaoyao.service.executor.manager.service.TaskService;
+import java.io.IOException;
+import org.eclipse.collections.api.list.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * @author lanyuanxiaoyao
+ * @date 2024-01-10
+ */
+@RestController
+@RequestMapping("task")
+public class TaskController {
+ private static final Logger logger = LoggerFactory.getLogger(TaskController.class);
+
+ private final TaskService taskService;
+
+ public TaskController(TaskService taskService) {
+ this.taskService = taskService;
+ }
+
+ @GetMapping("scan")
+ public String scan(
+ @RequestParam("hdfs") String hdfs,
+ @RequestParam("key") String key,
+ @RequestParam(value = "scan_log", defaultValue = "true") Boolean scanLog,
+ @RequestParam(value = "scan_data", defaultValue = "false") Boolean scanData,
+ @RequestParam(value = "scan_source", defaultValue = "false") Boolean scanSource,
+ @RequestParam(value = "scan_target", defaultValue = "false") Boolean scanTarget
+ ) throws Exception {
+ return taskService.scanAvro(hdfs, key, scanLog, scanData, scanSource, scanTarget);
+ }
+
+ @GetMapping("results")
+ public ImmutableList results(
+ @RequestParam("task_id") String taskId,
+ @RequestParam(value = "limit", defaultValue = "1000") Integer limit
+ ) throws IOException {
+ return taskService.taskResult(taskId, limit);
+ }
+}
diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java
new file mode 100644
index 0000000..5677437
--- /dev/null
+++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java
@@ -0,0 +1,154 @@
+package com.lanyuanxiaoyao.service.executor.manager.service;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.util.IdUtil;
+import cn.hutool.core.util.StrUtil;
+import com.eshore.odcp.hudi.connector.utils.executor.Runner;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.lanyuanxiaoyao.service.executor.core.TaskConstants;
+import com.lanyuanxiaoyao.service.executor.core.TaskContext;
+import com.lanyuanxiaoyao.service.executor.manager.configuration.ExecutorConfiguration;
+import com.lanyuanxiaoyao.service.executor.manager.configuration.HadoopConfiguration;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.util.ArrayList;
+import org.apache.flink.client.cli.ClientOptions;
+import org.apache.flink.configuration.*;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.eclipse.collections.api.factory.Lists;
+import org.eclipse.collections.api.factory.Maps;
+import org.eclipse.collections.api.list.ImmutableList;
+import org.eclipse.collections.api.list.MutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cache.annotation.Cacheable;
+import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
+import org.springframework.retry.annotation.Retryable;
+import org.springframework.stereotype.Service;
+
+import static com.eshore.odcp.hudi.connector.Constants.HALF_HOUR;
+import static com.eshore.odcp.hudi.connector.Constants.MINUTE;
+
+/**
+ * @author lanyuanxiaoyao
+ * @date 2024-01-10
+ */
+@Service
+public class TaskService {
+ private static final Logger logger = LoggerFactory.getLogger(TaskService.class);
+
+ private final HadoopConfiguration hadoopConfiguration;
+ private final ExecutorConfiguration executorConfiguration;
+ private final ObjectMapper mapper;
+
+ public TaskService(HadoopConfiguration hadoopConfiguration, ExecutorConfiguration executorConfiguration, Jackson2ObjectMapperBuilder builder) {
+ this.hadoopConfiguration = hadoopConfiguration;
+ this.executorConfiguration = executorConfiguration;
+ this.mapper = builder.build();
+ }
+
+ private String taskId() {
+ return IdUtil.nanoId(8);
+ }
+
+ private Configuration generateConfiguration(String taskId, String name) {
+ Configuration configuration = new Configuration();
+ configuration.setBoolean(JobManagerOptions.JVM_DIRECT_MEMORY_LIMIT_ENABLED, true);
+ configuration.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
+ configuration.setString(AkkaOptions.TCP_TIMEOUT, "15 min");
+ configuration.setString(AkkaOptions.LOOKUP_TIMEOUT, "10 min");
+ configuration.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(30));
+ // Kerberos认证
+ configuration.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, true);
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, hadoopConfiguration.getKerberosKeytabPath());
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, hadoopConfiguration.getKerberosPrincipal());
+ configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, MINUTE);
+ configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, HALF_HOUR);
+ configuration.setString(AkkaOptions.ASK_TIMEOUT, "1 min");
+ configuration.setString(AkkaOptions.TCP_TIMEOUT, "2 min");
+ configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+ configuration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "4");
+ configuration.setString(YarnConfigOptions.STAGING_DIRECTORY, executorConfiguration.getStagingDirectory());
+ configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
+ configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
+ configuration.setInteger(RestOptions.PORT, 8081);
+ configuration.setString(RestOptions.BIND_PORT, "8084-9400");
+ configuration.setString(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
+ configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("2048m"));
+ configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.parse("128m"));
+ configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("10240m"));
+ configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("0m"));
+ configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.parse("128m"));
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10);
+ configuration.setString(YarnConfigOptions.APPLICATION_NAME, "Service_Task_" + name + "_" + taskId);
+ configuration.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, executorConfiguration.getHistoryServerArchiveDir());
+ configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000);
+
+ configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ configuration.setString(YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, "/app-logs");
+
+ configuration.set(PipelineOptions.JARS, new ArrayList() {{
+ add(executorConfiguration.getTaskJarPath());
+ }});
+ return configuration;
+ }
+
+ public String scanAvro(String hdfs, String key, Boolean scanLog, Boolean scanData, Boolean scanSource, Boolean scanTarget) throws Exception {
+ String taskId = taskId();
+ ApplicationId applicationId = Runner.run(
+ generateConfiguration(taskId, "scan"),
+ "com.lanyuanxiaoyao.service.executor.task.DataScanner",
+ new String[]{
+ TaskConstants.TASK_CONTEXT_OPTION,
+ mapper.writeValueAsString(
+ new TaskContext(
+ taskId,
+ executorConfiguration.getTaskResultPath(),
+ Maps.mutable.of(
+ "key",
+ key,
+ "hdfs",
+ hdfs,
+ "scan_log",
+ scanLog,
+ "scan_data",
+ scanData
+ )
+ )
+ )
+ }
+ );
+ return applicationId.toString();
+ }
+
+ @Cacheable(value = "results", sync = true)
+ @Retryable(Throwable.class)
+ public ImmutableList taskResult(String taskId, Integer limit) throws IOException {
+ Path resultPath = new Path(executorConfiguration.getTaskResultPath(), taskId);
+ MutableList results = Lists.mutable.empty();
+ try (FileSystem fileSystem = FileSystem.get(new org.apache.hadoop.conf.Configuration())) {
+ if (!fileSystem.exists(resultPath)) {
+ throw new RuntimeException(StrUtil.format("Task {} result not found", taskId));
+ }
+ for (FileStatus status : fileSystem.listStatus(resultPath)) {
+ if (status.isFile() && results.size() < limit) {
+ try (FSDataInputStream file = fileSystem.open(status.getPath())) {
+ IoUtil.readLines(file, Charset.defaultCharset(), results);
+ }
+ }
+ }
+ }
+ return results
+ .reject(StrUtil::isBlank)
+ .collect(StrUtil::trim)
+ .toImmutable();
+ }
+}
diff --git a/service-executor/service-executor-task/pom.xml b/service-executor/service-executor-task/pom.xml
index fda909c..408b07a 100644
--- a/service-executor/service-executor-task/pom.xml
+++ b/service-executor/service-executor-task/pom.xml
@@ -69,6 +69,7 @@
org.apache.hadoop
hadoop-client
3.1.2
+ provided
org.eclipse.collections
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/AvroScanner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java
similarity index 58%
rename from service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/AvroScanner.java
rename to service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java
index 132a6b0..98a74d1 100644
--- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/AvroScanner.java
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java
@@ -3,14 +3,17 @@ package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
-import com.lanyuanxiaoyao.service.executor.task.functions.ReadLogFile;
+import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile;
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
+import java.io.IOException;
import java.util.Map;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
@@ -22,8 +25,8 @@ import org.slf4j.LoggerFactory;
* @author lanyuanxiaoyao
* @date 2024-01-08
*/
-public class AvroScanner {
- private static final Logger logger = LoggerFactory.getLogger(AvroScanner.class);
+public class DataScanner {
+ private static final Logger logger = LoggerFactory.getLogger(DataScanner.class);
public static void main(String[] args) throws Exception {
TaskContext taskContext = ArgumentsHelper.getContext(args);
@@ -34,6 +37,11 @@ public class AvroScanner {
String hdfs = (String) metadata.get("hdfs");
ArgumentsHelper.checkMetadata(taskContext, "key");
String key = (String) metadata.get("key");
+ Boolean scanLog = (Boolean) metadata.getOrDefault("scan_log", true);
+ Boolean scanData = (Boolean) metadata.getOrDefault("scan_data", false);
+ if (!scanLog && !scanData) {
+ throw new RuntimeException("Must choose mode scan_log or scan_data");
+ }
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(configuration);
@@ -43,13 +51,29 @@ public class AvroScanner {
ImmutableList paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs)))
.reject(status -> StrUtil.equals(".hoodie", status.getPath().getName()))
- .collect(status -> status.getPath().toString());
+ .flatCollect(status -> {
+ try {
+ if (status.isDirectory()) {
+ return Lists.immutable.of(fileSystem.listStatus(status.getPath()));
+ } else {
+ return Lists.immutable.of(status);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(FileStatus::getPath)
+ .select(path -> (FSUtils.isLogFile(path) && scanLog) || (FSUtils.isDataFile(path) && scanData))
+ .collect(Path::toString);
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
- environment.setParallelism(20);
- FlinkHelper.getAllLogFilePaths(environment.fromCollection(paths.toList()))
- .flatMap(new ReadLogFile())
+ environment.setParallelism(Math.max(paths.size() / 5, 1));
+ environment.fromCollection(paths.toList())
+ .shuffle()
+ .flatMap(new ReadHudiFile())
.map(RecordView::toString)
+ .filter(line -> StrUtil.contains(line, key))
+ .disableChaining()
.sinkTo(FlinkHelper.createFileSink(taskContext));
environment.execute(StrUtil.format("Search {} in {}", key, hdfs));
}
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java
index f3dc475..c0758af 100644
--- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java
@@ -48,7 +48,7 @@ public class RecordView implements Serializable, Comparable {
@Override
public String toString() {
- return StrUtil.format("\n{} {} {}\n{}", operation, timestamp, file, data);
+ return StrUtil.format("{} {} {} {}", operation, timestamp, file, data);
}
@Override
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadLogFile.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java
similarity index 70%
rename from service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadLogFile.java
rename to service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java
index fdfd518..042e891 100644
--- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadLogFile.java
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java
@@ -13,26 +13,32 @@ import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.*;
import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.io.storage.HoodieParquetReader;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.org.apache.avro.util.Utf8;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * 读取log文件
+ * 读取log/data文件
*
* @author lanyuanxiaoyao
* @date 2024-01-09
*/
-public class ReadLogFile implements FlatMapFunction {
- private RecordView parseData(String source, IndexedRecord record) {
+public class ReadHudiFile implements FlatMapFunction {
+ private static final Logger logger = LoggerFactory.getLogger(ReadHudiFile.class);
+
+ private RecordView parseData(String source, RecordView.Operation operation, IndexedRecord record) {
Schema schema = record.getSchema();
StringBuilder builder = new StringBuilder();
for (Schema.Field field : schema.getFields()) {
@@ -53,19 +59,41 @@ public class ReadLogFile implements FlatMapFunction {
}
String data = builder.toString();
- RecordView recordView = new RecordView(RecordView.Operation.UPSERT, data, timestamp, source);
+ RecordView recordView = new RecordView(operation, data, timestamp, source);
recordView.getAttributes().put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, latestOpTs);
return recordView;
}
@Override
- public void flatMap(String logFilePath, Collector out) throws IOException {
+ public void flatMap(String value, Collector out) throws IOException {
Configuration readerConfiguration = new Configuration();
FileSystem readerFilesystem = FileSystem.get(readerConfiguration);
- MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(readerFilesystem, new Path(logFilePath));
+ Path filePath = new Path(value);
+ if (FSUtils.isLogFile(filePath)) {
+ readLogFile(readerFilesystem, filePath, out);
+ } else if (FSUtils.isDataFile(filePath)) {
+ readDataFile(readerFilesystem, filePath, out);
+ } else {
+ logger.warn("Cannot read file format: {}", filePath);
+ }
+ }
+
+ private void readDataFile(FileSystem readerFilesystem, Path dataFilePath, Collector out) throws IOException {
+ HoodieParquetReader reader = new HoodieParquetReader<>(readerFilesystem.getConf(), dataFilePath);
+ ClosableIterator recordIterator = reader.getRecordIterator();
+ while (recordIterator.hasNext()) {
+ RecordView recordView = parseData(dataFilePath.toString(), RecordView.Operation.RESULT, recordIterator.next());
+ out.collect(recordView);
+ }
+ recordIterator.close();
+ reader.close();
+ }
+
+ private void readLogFile(FileSystem readerFilesystem, Path logFilePath, Collector out) throws IOException {
+ MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(readerFilesystem, logFilePath);
Schema schema = new AvroSchemaConverter().convert(Objects.requireNonNull(messageType));
- try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(new Path(logFilePath)), schema)) {
+ try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(logFilePath), schema)) {
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
Map logBlockHeader = block.getLogBlockHeader();
@@ -75,7 +103,7 @@ public class ReadLogFile implements FlatMapFunction {
HoodieAvroDataBlock avroDataBlock = (HoodieAvroDataBlock) block;
try (ClosableIterator avroDataBlockRecordIterator = avroDataBlock.getRecordIterator()) {
while (avroDataBlockRecordIterator.hasNext()) {
- RecordView recordView = parseData(logFilePath, avroDataBlockRecordIterator.next());
+ RecordView recordView = parseData(logFilePath.toString(), RecordView.Operation.UPSERT, avroDataBlockRecordIterator.next());
out.collect(recordView);
}
}
@@ -84,7 +112,7 @@ public class ReadLogFile implements FlatMapFunction {
HoodieParquetDataBlock parquetDataBlock = (HoodieParquetDataBlock) block;
try (ClosableIterator parquetDataBlockRecordIterator = parquetDataBlock.getRecordIterator()) {
while (parquetDataBlockRecordIterator.hasNext()) {
- RecordView recordView = parseData(logFilePath, parquetDataBlockRecordIterator.next());
+ RecordView recordView = parseData(logFilePath.toString(), RecordView.Operation.UPSERT, parquetDataBlockRecordIterator.next());
out.collect(recordView);
}
}
@@ -96,12 +124,12 @@ public class ReadLogFile implements FlatMapFunction {
String keys = Arrays.stream(deleteBlock.getRecordsToDelete())
.map(deleteRecord -> deleteRecord.getHoodieKey().toString())
.collect(Collectors.joining(" "));
- out.collect(new RecordView(RecordView.Operation.DELETE, keys, instant, logFilePath));
+ out.collect(new RecordView(RecordView.Operation.DELETE, keys, instant, logFilePath.toString()));
break;
case COMMAND_BLOCK:
HoodieCommandBlock commandBlock = (HoodieCommandBlock) block;
Map header = commandBlock.getLogBlockHeader();
- out.collect(new RecordView(RecordView.Operation.ROLLBACK, header.get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME), instant, logFilePath));
+ out.collect(new RecordView(RecordView.Operation.ROLLBACK, header.get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME), instant, logFilePath.toString()));
break;
default:
break;
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java
index 532a642..e74dd6a 100644
--- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java
@@ -43,31 +43,4 @@ public class FlinkHelper {
.withOutputFileConfig(new OutputFileConfig("task", ""))
.build();
}
-
- public static DataStream getAllFilePaths(DataStream source) {
- return source
- .map(path -> {
- Configuration configuration = new Configuration();
- FileSystem fileSystem = FileSystem.get(configuration);
- FileStatus[] statuses = fileSystem.listStatus(new org.apache.hadoop.fs.Path(path));
- String[] results = new String[statuses.length];
- for (int index = 0; index < statuses.length; index++) {
- results[index] = statuses[index].getPath().toString();
- }
- return results;
- })
- .name("Read files")
- .flatMap(new FlatMapIterator() {
- @Override
- public Iterator flatMap(String[] strings) {
- return Arrays.asList(strings).iterator();
- }
- });
- }
-
- public static DataStream getAllLogFilePaths(DataStream source) {
- return getAllFilePaths(source)
- .filter(FSUtils::isLogFile)
- .name("Filter log files");
- }
}
diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java
new file mode 100644
index 0000000..b83b9e8
--- /dev/null
+++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java
@@ -0,0 +1,27 @@
+package com.lanyuanxiaoyao.service.forest.service;
+
+import com.dtflys.forest.annotation.BaseRequest;
+import com.dtflys.forest.annotation.Get;
+import com.dtflys.forest.annotation.Query;
+import org.eclipse.collections.api.list.ImmutableList;
+
+/**
+ * 任务服务
+ *
+ * @author lanyuanxiaoyao
+ * @date 2024-01-10
+ */
+@BaseRequest(baseURL = "http://service-executor-manager")
+public interface TaskService {
+ @Get(value = "/task/scan", readTimeout = 2 * 60 * 1000)
+ String scan(@Query("hdfs") String hdfs, @Query("key") String key);
+
+ @Get(value = "/task/scan", readTimeout = 2 * 60 * 1000)
+ String scan(@Query("hdfs") String hdfs, @Query("key") String key, @Query("scan_log") Boolean scanLog, @Query("scan_data") Boolean scanData);
+
+ @Get("/task/results")
+ ImmutableList results(@Query("task_id") String taskId);
+
+ @Get("/task/results")
+ ImmutableList results(@Query("task_id") String taskId, @Query("limit") Integer limit);
+}