From e8fe8c4680fbe83721b8ad55774a9bf4baf536e4 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Mon, 20 May 2024 15:25:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(executor-task):=20=E4=BC=98=E5=8C=96task?= =?UTF-8?q?=E7=9A=84=E9=85=8D=E7=BD=AE=E4=BF=A1=E6=81=AF=E4=BC=A0=E9=80=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/command/commands/ToolCommand.java | 2 -- .../service/executor/task/SQLExecutor.java | 21 ++++++------------- .../service/executor/task/TableSummary.java | 11 ++++++---- 3 files changed, 13 insertions(+), 21 deletions(-) diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/ToolCommand.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/ToolCommand.java index 672f5b8..b613896 100644 --- a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/ToolCommand.java +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/ToolCommand.java @@ -51,7 +51,6 @@ public class ToolCommand { logger.warn("{} not found", hdfs); } }); - logger.info("Result:\n{}", results.makeString("\n")); } @ShellMethod("降级hoodie.properties") @@ -75,6 +74,5 @@ public class ToolCommand { logger.warn("{} not found", hdfs); } }); - logger.info("Result:\n{}", results.makeString("\n")); } } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/SQLExecutor.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/SQLExecutor.java index 554addf..5a8d309 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/SQLExecutor.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/SQLExecutor.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.executor.task; +import cn.hutool.core.collection.IterUtil; import cn.hutool.core.util.StrUtil; import de.vandermeer.asciitable.AsciiTable; import de.vandermeer.asciitable.CWC_LongestLine; @@ -38,10 +39,6 @@ public class SQLExecutor { private static final Logger logger = LoggerFactory.getLogger(SQLExecutor.class); protected static CloseableIterator executeStream(HoodieTableMetaClient metaClient, Function SQL) throws Exception { - return executeStream(metaClient, 10, SQL); - } - - protected static CloseableIterator executeStream(HoodieTableMetaClient metaClient, Integer parallelism, Function SQL) throws Exception { HoodieTableConfig tableConfig = metaClient.getTableConfig(); Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); @@ -55,18 +52,13 @@ public class SQLExecutor { tableConfig.getPartitionFields().ifPresent(builder::partition); tableConfig.getRecordKeyFields().ifPresent(builder::pk); builder.option("connector", "hudi"); - builder.option(FlinkOptions.PATH, metaClient.getBasePathV2()); - builder.option(FlinkOptions.READ_TASKS, parallelism); + builder.options(IterUtil.toMap(metaClient.getHadoopConf())); Table table = tableEnvironment.fromDataStream(builder.source(executionEnvironment)); return SQL.apply(table).collect(); } protected static CloseableIterator executeBatch(HoodieTableMetaClient metaClient, Function SQL) throws Exception { - return executeBatch(metaClient, 10, SQL); - } - - protected static CloseableIterator executeBatch(HoodieTableMetaClient metaClient, Integer parallelism, Function SQL) throws Exception { HoodieTableConfig tableConfig = metaClient.getTableConfig(); Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); @@ -85,12 +77,11 @@ public class SQLExecutor { }), tableConfig.getPartitionFields().map(Lists.immutable::of).orElse(Lists.immutable.empty()), tableConfig.getRecordKeyFields().map(Lists.immutable::of).orElse(Lists.immutable.empty()), - Maps.immutable.of( - FlinkOptions.PATH.key(), metaClient.getBasePathV2().toString(), - FlinkOptions.READ_TASKS.key(), parallelism.toString() - ) + Maps.immutable.ofAll(IterUtil.toMap(metaClient.getHadoopConf())) )); - return tableEnvironment.executeSql(SQL.apply(tableConfig.getTableName())).collect(); + String sql = SQL.apply(tableConfig.getTableName()); + logger.info("SQL: {}", sql); + return tableEnvironment.executeSql(sql).collect(); } protected static String generateResult(CloseableIterator iterator, ImmutableList fields) { diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSummary.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSummary.java index ac85cc9..0cab359 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSummary.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSummary.java @@ -10,6 +10,7 @@ import org.apache.flink.util.CloseableIterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.configuration.FlinkOptions; import org.eclipse.collections.api.factory.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,15 +31,17 @@ public class TableSummary extends SQLExecutor { ArgumentsHelper.checkMetadata(taskContext, "hdfs"); String hdfs = (String) taskContext.getMetadata().get("hdfs"); + Configuration configuration = new Configuration(); + configuration.setStrings(FlinkOptions.PATH.key(), hdfs); + configuration.setInt(FlinkOptions.READ_TASKS.key(), 50); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() - .setConf(new Configuration()) + .setConf(configuration) .setBasePath(hdfs) .build(); - int parallelism = HdfsHelper.baseScanParallelismPredict(HdfsHelper.basePaths(HdfsHelper.hdfsPaths(metaClient.getRawFs(), hdfs))); try (CloseableIterator iterator = executeBatch( metaClient, - parallelism, - tableName -> StrUtil.format("select count({}) as `count`, max({}) as latest_op_ts from {}", Constants.UNION_KEY_NAME, Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, tableName) + tableName -> StrUtil.format("select count({}) as `count`, max({}) as latest_op_ts from `{}`", Constants.UNION_KEY_NAME, Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, tableName) )) { HdfsHelper.createResult(FileSystem.get(metaClient.getHadoopConf()), taskContext, generateResult(iterator, Lists.immutable.of("count", "latest_op_ts"))); }