From f398b8cdc33c2409c4e8b1ec789d835a0965c2c6 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Fri, 17 May 2024 17:32:07 +0800 Subject: [PATCH] =?UTF-8?q?feat(executor-task):=20=E4=BD=BF=E7=94=A8Flink?= =?UTF-8?q?=20SQL=E6=9F=A5=E8=AF=A2=E8=A1=A8=E6=80=BB=E6=95=B0=E3=80=81?= =?UTF-8?q?=E4=B8=9A=E5=8A=A1=E6=9C=80=E5=90=8E=E6=93=8D=E4=BD=9C=E6=97=B6?= =?UTF-8?q?=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 + .../service-executor-manager/pom.xml | 4 + .../controller/ExecutorTaskController.java | 4 +- .../manager/service/ExecutorTaskService.java | 6 +- .../service-executor-task/pom.xml | 5 + .../task/LatestOperationTimeScan.java | 76 --------- .../service/executor/task/SQLExecutor.java | 148 ++++++++++++++++++ .../service/executor/task/TableSummary.java | 46 ++++++ .../executor/task/helper/HdfsHelper.java | 2 +- .../executor/task/ConsoleTableTest.java | 26 +++ .../service/forest/service/TaskService.java | 4 +- .../web/controller/TaskController.java | 6 +- .../resources/static/components/common.js | 3 +- .../resources/static/components/task-tab.js | 4 +- 14 files changed, 249 insertions(+), 90 deletions(-) delete mode 100644 service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java create mode 100644 service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/SQLExecutor.java create mode 100644 service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSummary.java create mode 100644 service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ConsoleTableTest.java diff --git a/pom.xml b/pom.xml index 07e11e8..cfe0cab 100644 --- a/pom.xml +++ b/pom.xml @@ -177,6 +177,11 @@ progressbar 0.9.3 + + de.vandermeer + asciitable + 0.3.2 + diff --git a/service-executor/service-executor-manager/pom.xml b/service-executor/service-executor-manager/pom.xml index 5e0b6c6..aa49dc1 100644 --- a/service-executor/service-executor-manager/pom.xml +++ b/service-executor/service-executor-manager/pom.xml @@ -38,6 +38,10 @@ + + de.vandermeer + asciitable + com.sun.jersey jersey-client diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java index ce9165b..d1d0031 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java @@ -52,12 +52,12 @@ public class ExecutorTaskController { return executorTaskService.scanAvro(key, hdfs, pulsar, pulsarTopic, scanSource, scanQueue, scanLog, scanBase, scanTarget, filterFields); } - @GetMapping("latest_op_ts") + @GetMapping("table_summary") public String latestOpTs(@RequestParam("hdfs") String hdfs) throws Exception { if (StrUtil.isBlank(hdfs)) { throw new RuntimeException("Hdfs path cannot be empty"); } - return executorTaskService.scanLatestOpTs(hdfs); + return executorTaskService.tableSummary(hdfs); } @GetMapping("results") diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java index a0a6741..37f0066 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java @@ -209,9 +209,9 @@ public class ExecutorTaskService { return applicationId.toString(); } - public String scanLatestOpTs(String hdfs) throws Exception { + public String tableSummary(String hdfs) throws Exception { String taskId = taskId(); - Configuration configuration = generateConfiguration(taskId, StrUtil.format("latest_op_ts {}", hdfs)); + Configuration configuration = generateConfiguration(taskId, StrUtil.format("summary {}", hdfs)); configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("1024m")); MapBuilder builder = MapUtil.builder(); @@ -219,7 +219,7 @@ public class ExecutorTaskService { ApplicationId applicationId = Runner.run( configuration, - "com.lanyuanxiaoyao.service.executor.task.LatestOperationTimeScan", + "com.lanyuanxiaoyao.service.executor.task.TableSummary", new String[]{ TaskConstants.TASK_CONTEXT_OPTION, mapper.writeValueAsString( diff --git a/service-executor/service-executor-task/pom.xml b/service-executor/service-executor-task/pom.xml index 8b12f51..da512f2 100644 --- a/service-executor/service-executor-task/pom.xml +++ b/service-executor/service-executor-task/pom.xml @@ -77,6 +77,11 @@ pulsar-client provided + + de.vandermeer + asciitable + provided + diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java deleted file mode 100644 index 76f4b4f..0000000 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.lanyuanxiaoyao.service.executor.task; - -import cn.hutool.core.util.RandomUtil; -import cn.hutool.core.util.StrUtil; -import com.lanyuanxiaoyao.service.common.Constants; -import com.lanyuanxiaoyao.service.executor.core.TaskContext; -import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile; -import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; -import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper; -import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper; -import org.apache.flink.api.common.functions.RichReduceFunction; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.CloseableIterator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.eclipse.collections.api.list.ImmutableList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * 查找base文件最后的opts - * - * @author lanyuanxiaoyao - * @date 2024-01-22 - */ -public class LatestOperationTimeScan { - private static final Logger logger = LoggerFactory.getLogger(LatestOperationTimeScan.class); - - public static void main(String[] args) throws Exception { - TaskContext taskContext = ArgumentsHelper.getContext(args); - logger.info("Context: {}", taskContext); - - ArgumentsHelper.checkMetadata(taskContext, "hdfs"); - String hdfs = (String) taskContext.getMetadata().get("hdfs"); - - FileSystem fileSystem = FileSystem.get(new Configuration()); - HdfsHelper.checkHdfsPath(fileSystem, hdfs); - ImmutableList basePaths = HdfsHelper.latestBasePaths(HdfsHelper.hdfsPaths(fileSystem, hdfs)); - - int parallelism = HdfsHelper.baseScanParallelismPredict(basePaths); - - StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment(); - environment.setParallelism(parallelism); - String maxValue = "0"; - try (CloseableIterator iterator = environment - .fromCollection(basePaths.toList()) - .name("Read base paths") - .flatMap(new ReadHudiFile()) - .name("Read hudi file") - .setParallelism(parallelism) - .map(view -> (String) view.getAttributes().getOrDefault(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, "0")) - .map(time -> new Tuple2<>(RandomUtil.randomInt(parallelism), time), TypeInformation.of(new TypeHint>() { - })) - .keyBy(tuple -> tuple.f0) - .reduce(new RichReduceFunction>() { - @Override - public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { - return value1.f1.compareTo(value2.f1) > 0 ? value1 : value2; - } - }) - .map(tuple -> tuple.f1) - /*.sinkTo(FlinkHelper.createFileSink(taskContext))*/ - .executeAndCollect("Find latest opts")) { - while (iterator.hasNext()) { - String item = iterator.next(); - if (item.compareTo(maxValue) > 0) { - maxValue = item; - } - } - } - HdfsHelper.createResult(fileSystem, taskContext, StrUtil.trim(maxValue)); - } -} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/SQLExecutor.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/SQLExecutor.java new file mode 100644 index 0000000..554addf --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/SQLExecutor.java @@ -0,0 +1,148 @@ +package com.lanyuanxiaoyao.service.executor.task; + +import cn.hutool.core.util.StrUtil; +import de.vandermeer.asciitable.AsciiTable; +import de.vandermeer.asciitable.CWC_LongestLine; +import de.vandermeer.asciithemes.TA_GridThemes; +import java.util.function.Function; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.org.apache.avro.Schema; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.HoodiePipeline; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 通过执行Flink SQL查数据 + * + * @author lanyuanxiaoyao + * @date 2024-05-17 + */ +public class SQLExecutor { + private static final Logger logger = LoggerFactory.getLogger(SQLExecutor.class); + + protected static CloseableIterator executeStream(HoodieTableMetaClient metaClient, Function SQL) throws Exception { + return executeStream(metaClient, 10, SQL); + } + + protected static CloseableIterator executeStream(HoodieTableMetaClient metaClient, Integer parallelism, Function SQL) throws Exception { + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + + StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment); + HoodiePipeline.Builder builder = HoodiePipeline.builder(tableConfig.getTableName()); + for (Schema.Field field : schema.getFields()) { + DataType type = AvroSchemaConverter.convertToDataType(field.schema()); + builder.column(StrUtil.format("{} {}", field.name(), type.getLogicalType())); + } + tableConfig.getPartitionFields().ifPresent(builder::partition); + tableConfig.getRecordKeyFields().ifPresent(builder::pk); + builder.option("connector", "hudi"); + builder.option(FlinkOptions.PATH, metaClient.getBasePathV2()); + builder.option(FlinkOptions.READ_TASKS, parallelism); + + Table table = tableEnvironment.fromDataStream(builder.source(executionEnvironment)); + return SQL.apply(table).collect(); + } + + protected static CloseableIterator executeBatch(HoodieTableMetaClient metaClient, Function SQL) throws Exception { + return executeBatch(metaClient, 10, SQL); + } + + protected static CloseableIterator executeBatch(HoodieTableMetaClient metaClient, Integer parallelism, Function SQL) throws Exception { + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + + TableEnvironment tableEnvironment = TableEnvironment.create( + EnvironmentSettings.newInstance() + .inBatchMode() + .useBlinkPlanner() + .build() + ); + tableEnvironment.executeSql(createHoodieTableDDL( + tableConfig.getTableName(), + Lists.immutable.ofAll(schema.getFields()) + .collect(field -> { + DataType type = AvroSchemaConverter.convertToDataType(field.schema()); + return StrUtil.format("{} {}", field.name(), type.getLogicalType()); + }), + tableConfig.getPartitionFields().map(Lists.immutable::of).orElse(Lists.immutable.empty()), + tableConfig.getRecordKeyFields().map(Lists.immutable::of).orElse(Lists.immutable.empty()), + Maps.immutable.of( + FlinkOptions.PATH.key(), metaClient.getBasePathV2().toString(), + FlinkOptions.READ_TASKS.key(), parallelism.toString() + ) + )); + return tableEnvironment.executeSql(SQL.apply(tableConfig.getTableName())).collect(); + } + + protected static String generateResult(CloseableIterator iterator, ImmutableList fields) { + AsciiTable table = new AsciiTable(); + table.addRule(); + table.addRow(fields.toArray()); + table.addRule(); + iterator.forEachRemaining(row -> table.addRow(fields.collect(row::getField).toArray())); + table.addRule(); + table.getRenderer().setCWC(new CWC_LongestLine()); + table.getContext().setGridTheme(TA_GridThemes.NONE); + table.setPaddingLeftRight(1); + return table.render(); + } + + private static String createHoodieTableDDL( + String tableName, + ImmutableList fields, + ImmutableList primaryKeys, + ImmutableList partitionKeys, + ImmutableMap options + ) { + StringBuilder builder = new StringBuilder(); + builder.append("create table if not exists ") + .append(tableName) + .append("(\n"); + for (String field : fields) { + builder.append(" ") + .append(field) + .append(",\n"); + } + builder.append(" PRIMARY KEY(") + .append(primaryKeys.makeString(",")) + .append(") NOT ENFORCED\n") + .append(")\n"); + if (!partitionKeys.isEmpty()) { + String partitons = partitionKeys + .collect(key -> StrUtil.format("`{}`", key)) + .makeString(","); + builder.append("PARTITIONED BY (") + .append(partitons) + .append(")\n"); + } + builder.append("with ('connector' = 'hudi'"); + options.forEachKeyValue((k, v) -> builder + .append(",\n") + .append(" '") + .append(k) + .append("' = '") + .append(v) + .append("'")); + builder.append("\n)"); + return builder.toString(); + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSummary.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSummary.java new file mode 100644 index 0000000..ac85cc9 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSummary.java @@ -0,0 +1,46 @@ +package com.lanyuanxiaoyao.service.executor.task; + +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.common.Constants; +import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; +import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.eclipse.collections.api.factory.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 统计表总数 + * + * @author lanyuanxiaoyao + * @date 2024-05-17 + */ +public class TableSummary extends SQLExecutor { + private static final Logger logger = LoggerFactory.getLogger(TableSummary.class); + + public static void main(String[] args) throws Exception { + TaskContext taskContext = ArgumentsHelper.getContext(args); + logger.info("Context: {}", taskContext); + + ArgumentsHelper.checkMetadata(taskContext, "hdfs"); + String hdfs = (String) taskContext.getMetadata().get("hdfs"); + + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath(hdfs) + .build(); + int parallelism = HdfsHelper.baseScanParallelismPredict(HdfsHelper.basePaths(HdfsHelper.hdfsPaths(metaClient.getRawFs(), hdfs))); + try (CloseableIterator iterator = executeBatch( + metaClient, + parallelism, + tableName -> StrUtil.format("select count({}) as `count`, max({}) as latest_op_ts from {}", Constants.UNION_KEY_NAME, Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, tableName) + )) { + HdfsHelper.createResult(FileSystem.get(metaClient.getHadoopConf()), taskContext, generateResult(iterator, Lists.immutable.of("count", "latest_op_ts"))); + } + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java index 0e88eed..8d982eb 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java @@ -110,7 +110,7 @@ public class HdfsHelper { public static void createResult(FileSystem fileSystem, TaskContext context, String result) throws IOException { Path resultPath = new Path(context.getResultPath() + "/" + context.getTaskId() + "/task-result"); try (FSDataOutputStream outputStream = fileSystem.create(resultPath)) { - outputStream.writeUTF(result); + outputStream.writeBytes(result); } } } diff --git a/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ConsoleTableTest.java b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ConsoleTableTest.java new file mode 100644 index 0000000..fe776db --- /dev/null +++ b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ConsoleTableTest.java @@ -0,0 +1,26 @@ +package com.lanyuanxiaoyao.service.executor.task; + +import de.vandermeer.asciitable.AsciiTable; +import de.vandermeer.asciitable.CWC_LongestLine; +import de.vandermeer.asciithemes.TA_GridThemes; +import de.vandermeer.asciithemes.a8.A8_Grids; + +/** + * @author + * @date 2024-05-17 + */ +public class ConsoleTableTest { + public static void main(String[] args) { + AsciiTable table = new AsciiTable(); + table.addRule(); + table.addRow("Count", "Latest Opts"); + table.addRule(); + table.addRow("20657434", "2024-07-07 10:50:54"); + table.addRow("20657434", "2024-07-07 10:50:54"); + table.addRule(); + table.getRenderer().setCWC(new CWC_LongestLine()); + table.getContext().setGridTheme(TA_GridThemes.FULL); + table.setPaddingLeftRight(1); + System.out.println(table.render()); + } +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java index 50ad930..5146913 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java @@ -25,8 +25,8 @@ public interface TaskService { @Query("filter_fields") String filterFields ); - @Get(value = "/task/latest_op_ts", readTimeout = 2 * 60 * 1000) - String latestOpTs(@Query("hdfs") String hdfs); + @Get(value = "/task/table_summary", readTimeout = 2 * 60 * 1000) + String tableSummary(@Query("hdfs") String hdfs); @Get("/task/results") ImmutableList results(@Query("task_id") String taskId); diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java index 5309170..ee2aa0c 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java @@ -67,13 +67,13 @@ public class TaskController { return AmisResponse.responseSuccess(); } - @GetMapping("latest_op_ts") - public AmisResponse latestOpTs(@RequestParam("hdfs") String hdfs) { + @GetMapping("table_summary") + public AmisResponse tableSummary(@RequestParam("hdfs") String hdfs) { if (StrUtil.isBlank(hdfs)) { throw new RuntimeException("Hdfs cannot be blank"); } ExecutorProvider.EXECUTORS.submit(() -> { - String applicationId = taskService.latestOpTs(hdfs); + String applicationId = taskService.tableSummary(hdfs); logger.info("Task: {}", applicationId); }); return AmisResponse.responseSuccess(); diff --git a/service-web/src/main/resources/static/components/common.js b/service-web/src/main/resources/static/components/common.js index 527b9a0..8cdd684 100644 --- a/service-web/src/main/resources/static/components/common.js +++ b/service-web/src/main/resources/static/components/common.js @@ -416,7 +416,8 @@ function yarnCrudColumns() { disabled: true, name: 'text', options: { - wordWrap: 'on' + wordWrap: 'on', + fontFamily: 'monospace', } } } diff --git a/service-web/src/main/resources/static/components/task-tab.js b/service-web/src/main/resources/static/components/task-tab.js index 162f02f..bdedf32 100644 --- a/service-web/src/main/resources/static/components/task-tab.js +++ b/service-web/src/main/resources/static/components/task-tab.js @@ -95,7 +95,7 @@ function taskTab() { }, { type: 'form', - title: '检索最后操作时间', + title: '综合查询 (总数、最后操作时间)', actions: [ { type: 'submit', @@ -103,7 +103,7 @@ function taskTab() { actionType: 'ajax', api: { method: 'get', - url: '${base}/task/latest_op_ts', + url: '${base}/task/table_summary', data: { hdfs: '${hdfs|default:undefined}', }