feat(executor-task): 优化task的配置信息传递

This commit is contained in:
v-zhangjc9
2024-05-20 15:25:10 +08:00
parent d94589cc63
commit e8fe8c4680
3 changed files with 13 additions and 21 deletions

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.collection.IterUtil;
import cn.hutool.core.util.StrUtil;
import de.vandermeer.asciitable.AsciiTable;
import de.vandermeer.asciitable.CWC_LongestLine;
@@ -38,10 +39,6 @@ public class SQLExecutor {
private static final Logger logger = LoggerFactory.getLogger(SQLExecutor.class);
protected static CloseableIterator<Row> executeStream(HoodieTableMetaClient metaClient, Function<Table, TableResult> SQL) throws Exception {
return executeStream(metaClient, 10, SQL);
}
protected static CloseableIterator<Row> executeStream(HoodieTableMetaClient metaClient, Integer parallelism, Function<Table, TableResult> SQL) throws Exception {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
@@ -55,18 +52,13 @@ public class SQLExecutor {
tableConfig.getPartitionFields().ifPresent(builder::partition);
tableConfig.getRecordKeyFields().ifPresent(builder::pk);
builder.option("connector", "hudi");
builder.option(FlinkOptions.PATH, metaClient.getBasePathV2());
builder.option(FlinkOptions.READ_TASKS, parallelism);
builder.options(IterUtil.toMap(metaClient.getHadoopConf()));
Table table = tableEnvironment.fromDataStream(builder.source(executionEnvironment));
return SQL.apply(table).collect();
}
protected static CloseableIterator<Row> executeBatch(HoodieTableMetaClient metaClient, Function<String, String> SQL) throws Exception {
return executeBatch(metaClient, 10, SQL);
}
protected static CloseableIterator<Row> executeBatch(HoodieTableMetaClient metaClient, Integer parallelism, Function<String, String> SQL) throws Exception {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
@@ -85,12 +77,11 @@ public class SQLExecutor {
}),
tableConfig.getPartitionFields().map(Lists.immutable::of).orElse(Lists.immutable.empty()),
tableConfig.getRecordKeyFields().map(Lists.immutable::of).orElse(Lists.immutable.empty()),
Maps.immutable.of(
FlinkOptions.PATH.key(), metaClient.getBasePathV2().toString(),
FlinkOptions.READ_TASKS.key(), parallelism.toString()
)
Maps.immutable.ofAll(IterUtil.toMap(metaClient.getHadoopConf()))
));
return tableEnvironment.executeSql(SQL.apply(tableConfig.getTableName())).collect();
String sql = SQL.apply(tableConfig.getTableName());
logger.info("SQL: {}", sql);
return tableEnvironment.executeSql(sql).collect();
}
protected static String generateResult(CloseableIterator<Row> iterator, ImmutableList<String> fields) {

View File

@@ -10,6 +10,7 @@ import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.eclipse.collections.api.factory.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,15 +31,17 @@ public class TableSummary extends SQLExecutor {
ArgumentsHelper.checkMetadata(taskContext, "hdfs");
String hdfs = (String) taskContext.getMetadata().get("hdfs");
Configuration configuration = new Configuration();
configuration.setStrings(FlinkOptions.PATH.key(), hdfs);
configuration.setInt(FlinkOptions.READ_TASKS.key(), 50);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setConf(configuration)
.setBasePath(hdfs)
.build();
int parallelism = HdfsHelper.baseScanParallelismPredict(HdfsHelper.basePaths(HdfsHelper.hdfsPaths(metaClient.getRawFs(), hdfs)));
try (CloseableIterator<Row> iterator = executeBatch(
metaClient,
parallelism,
tableName -> StrUtil.format("select count({}) as `count`, max({}) as latest_op_ts from {}", Constants.UNION_KEY_NAME, Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, tableName)
tableName -> StrUtil.format("select count({}) as `count`, max({}) as latest_op_ts from `{}`", Constants.UNION_KEY_NAME, Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, tableName)
)) {
HdfsHelper.createResult(FileSystem.get(metaClient.getHadoopConf()), taskContext, generateResult(iterator, Lists.immutable.of("count", "latest_op_ts")));
}