From 6f2fce43590e3b7df9f26783748b806f98b6b88f Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Mon, 3 Jun 2024 16:17:41 +0800 Subject: [PATCH] =?UTF-8?q?feat(executor-manager):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E8=A1=A8=E6=95=B0=E6=8D=AE=E6=9F=A5=E8=AF=A210=E6=9D=A1?= =?UTF-8?q?=E9=87=87=E6=A0=B7=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/ExecutorTaskController.java | 10 +++- .../manager/service/ExecutorTaskService.java | 25 ++++++++ .../service/executor/task/TableSampling.java | 57 +++++++++++++++++++ .../service/forest/service/TaskService.java | 3 + .../web/controller/TaskController.java | 12 ++++ .../resources/static/components/task-tab.js | 18 +++++- 6 files changed, 121 insertions(+), 4 deletions(-) create mode 100644 service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSampling.java diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java index c2f7dd2..523d85e 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java @@ -53,13 +53,21 @@ public class ExecutorTaskController { } @GetMapping("table_summary") - public String latestOpTs(@RequestParam("hdfs") String hdfs) throws Exception { + public String tableSummary(@RequestParam("hdfs") String hdfs) throws Exception { if (StrUtil.isBlank(hdfs)) { throw new RuntimeException("Hdfs path cannot be empty"); } return executorTaskService.tableSummary(hdfs); } + @GetMapping("table_sampling") + public String tableSampling(@RequestParam("hdfs") String hdfs) throws Exception { + if (StrUtil.isBlank(hdfs)) { + throw new RuntimeException("Hdfs path cannot be empty"); + } + return executorTaskService.tableSampling(hdfs); + } + @GetMapping("law_enforcement") public String lawEnforcement( @RequestParam("pulsar_url") String pulsarUrl, diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java index 87b009a..1316cae 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java @@ -236,6 +236,31 @@ public class ExecutorTaskService { return applicationId.toString(); } + public String tableSampling(String hdfs) throws Exception { + String taskId = taskId(); + Configuration configuration = generateConfiguration(taskId, StrUtil.format("sampling {}", hdfs)); + configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("1024m")); + MapBuilder builder = MapUtil.builder(); + + builder.put("hdfs", hdfs); + + ApplicationId applicationId = Runner.run( + configuration, + "com.lanyuanxiaoyao.service.executor.task.TableSampling", + new String[]{ + TaskConstants.TASK_CONTEXT_OPTION, + mapper.writeValueAsString( + new TaskContext( + taskId, + executorConfiguration.getTaskResultPath(), + Maps.mutable.ofMap(builder.build()) + ) + ) + } + ); + return applicationId.toString(); + } + public String lawEnforcement( String pulsarUrl, String pulsarTopic, diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSampling.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSampling.java new file mode 100644 index 0000000..e875710 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSampling.java @@ -0,0 +1,57 @@ +package com.lanyuanxiaoyao.service.executor.task; + +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.common.Constants; +import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; +import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.org.apache.avro.Schema; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.lanyuanxiaoyao.service.executor.task.SQLExecutor.executeBatch; +import static com.lanyuanxiaoyao.service.executor.task.SQLExecutor.generateResult; + +/** + * 表采样 + * + * @author lanyuanxiaoyao + */ +public class TableSampling { + private static final Logger logger = LoggerFactory.getLogger(TableSampling.class); + + public static void main(String[] args) throws Exception { + TaskContext taskContext = ArgumentsHelper.getContext(args); + logger.info("Context: {}", taskContext); + + ArgumentsHelper.checkMetadata(taskContext, "hdfs"); + String hdfs = (String) taskContext.getMetadata().get("hdfs"); + + Configuration configuration = new Configuration(); + configuration.setStrings(FlinkOptions.PATH.key(), hdfs); + configuration.setInt(FlinkOptions.READ_TASKS.key(), 50); + + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(configuration) + .setBasePath(hdfs) + .build(); + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + Schema schema = tableConfig.getTableCreateSchema().orElseThrow(() -> new Exception("Cannot parse schema from " + hdfs)); + ImmutableList fields = Lists.immutable.ofAll(schema.getFields()).collect(Schema.Field::name); + try (CloseableIterator iterator = executeBatch( + metaClient, + tableName -> StrUtil.format("select {} from `{}` order by {} desc limit 10", fields.makeString(", "), tableName, Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME) + )) { + HdfsHelper.createResult(FileSystem.get(metaClient.getHadoopConf()), taskContext, generateResult(iterator, fields)); + } + } +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java index 5146913..8b16c4c 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java @@ -28,6 +28,9 @@ public interface TaskService { @Get(value = "/task/table_summary", readTimeout = 2 * 60 * 1000) String tableSummary(@Query("hdfs") String hdfs); + @Get(value = "/task/table_sampling", readTimeout = 2 * 60 * 1000) + String tableSampling(@Query("hdfs") String hdfs); + @Get("/task/results") ImmutableList results(@Query("task_id") String taskId); diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java index ee2aa0c..58f81bc 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java @@ -79,6 +79,18 @@ public class TaskController { return AmisResponse.responseSuccess(); } + @GetMapping("table_sampling") + public AmisResponse tableSampling(@RequestParam("hdfs") String hdfs) { + if (StrUtil.isBlank(hdfs)) { + throw new RuntimeException("Hdfs cannot be blank"); + } + ExecutorProvider.EXECUTORS.submit(() -> { + String applicationId = taskService.tableSampling(hdfs); + logger.info("Task: {}", applicationId); + }); + return AmisResponse.responseSuccess(); + } + @GetMapping("results") public AmisMapResponse results(@RequestParam("task_id") String taskId) { return AmisResponse.responseMapData() diff --git a/service-web/src/main/resources/static/components/task-tab.js b/service-web/src/main/resources/static/components/task-tab.js index bdedf32..25c771d 100644 --- a/service-web/src/main/resources/static/components/task-tab.js +++ b/service-web/src/main/resources/static/components/task-tab.js @@ -95,11 +95,11 @@ function taskTab() { }, { type: 'form', - title: '综合查询 (总数、最后操作时间)', + title: '综合查询', actions: [ { - type: 'submit', - label: '提交任务', + type: 'action', + label: '总数&最后操作时间', actionType: 'ajax', api: { method: 'get', @@ -109,6 +109,18 @@ function taskTab() { } } }, + { + type: 'action', + label: '最后10条记录', + actionType: 'ajax', + api: { + method: 'get', + url: '${base}/task/table_sampling', + data: { + hdfs: '${hdfs|default:undefined}', + } + } + }, ], body: [ {