feat(executor-task): 使用Flink SQL查询表总数、业务最后操作时间

This commit is contained in:
v-zhangjc9
2024-05-17 17:32:07 +08:00
parent 592d27ad6f
commit f398b8cdc3
14 changed files with 249 additions and 90 deletions

View File

@@ -1,76 +0,0 @@
package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile;
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 查找base文件最后的opts
*
* @author lanyuanxiaoyao
* @date 2024-01-22
*/
public class LatestOperationTimeScan {
private static final Logger logger = LoggerFactory.getLogger(LatestOperationTimeScan.class);
public static void main(String[] args) throws Exception {
TaskContext taskContext = ArgumentsHelper.getContext(args);
logger.info("Context: {}", taskContext);
ArgumentsHelper.checkMetadata(taskContext, "hdfs");
String hdfs = (String) taskContext.getMetadata().get("hdfs");
FileSystem fileSystem = FileSystem.get(new Configuration());
HdfsHelper.checkHdfsPath(fileSystem, hdfs);
ImmutableList<String> basePaths = HdfsHelper.latestBasePaths(HdfsHelper.hdfsPaths(fileSystem, hdfs));
int parallelism = HdfsHelper.baseScanParallelismPredict(basePaths);
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
environment.setParallelism(parallelism);
String maxValue = "0";
try (CloseableIterator<String> iterator = environment
.fromCollection(basePaths.toList())
.name("Read base paths")
.flatMap(new ReadHudiFile())
.name("Read hudi file")
.setParallelism(parallelism)
.map(view -> (String) view.getAttributes().getOrDefault(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, "0"))
.map(time -> new Tuple2<>(RandomUtil.randomInt(parallelism), time), TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {
}))
.keyBy(tuple -> tuple.f0)
.reduce(new RichReduceFunction<Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
return value1.f1.compareTo(value2.f1) > 0 ? value1 : value2;
}
})
.map(tuple -> tuple.f1)
/*.sinkTo(FlinkHelper.createFileSink(taskContext))*/
.executeAndCollect("Find latest opts")) {
while (iterator.hasNext()) {
String item = iterator.next();
if (item.compareTo(maxValue) > 0) {
maxValue = item;
}
}
}
HdfsHelper.createResult(fileSystem, taskContext, StrUtil.trim(maxValue));
}
}

View File

@@ -0,0 +1,148 @@
package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.util.StrUtil;
import de.vandermeer.asciitable.AsciiTable;
import de.vandermeer.asciitable.CWC_LongestLine;
import de.vandermeer.asciithemes.TA_GridThemes;
import java.util.function.Function;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.HoodiePipeline;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 通过执行Flink SQL查数据
*
* @author lanyuanxiaoyao
* @date 2024-05-17
*/
public class SQLExecutor {
private static final Logger logger = LoggerFactory.getLogger(SQLExecutor.class);
protected static CloseableIterator<Row> executeStream(HoodieTableMetaClient metaClient, Function<Table, TableResult> SQL) throws Exception {
return executeStream(metaClient, 10, SQL);
}
protected static CloseableIterator<Row> executeStream(HoodieTableMetaClient metaClient, Integer parallelism, Function<Table, TableResult> SQL) throws Exception {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment);
HoodiePipeline.Builder builder = HoodiePipeline.builder(tableConfig.getTableName());
for (Schema.Field field : schema.getFields()) {
DataType type = AvroSchemaConverter.convertToDataType(field.schema());
builder.column(StrUtil.format("{} {}", field.name(), type.getLogicalType()));
}
tableConfig.getPartitionFields().ifPresent(builder::partition);
tableConfig.getRecordKeyFields().ifPresent(builder::pk);
builder.option("connector", "hudi");
builder.option(FlinkOptions.PATH, metaClient.getBasePathV2());
builder.option(FlinkOptions.READ_TASKS, parallelism);
Table table = tableEnvironment.fromDataStream(builder.source(executionEnvironment));
return SQL.apply(table).collect();
}
protected static CloseableIterator<Row> executeBatch(HoodieTableMetaClient metaClient, Function<String, String> SQL) throws Exception {
return executeBatch(metaClient, 10, SQL);
}
protected static CloseableIterator<Row> executeBatch(HoodieTableMetaClient metaClient, Integer parallelism, Function<String, String> SQL) throws Exception {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
TableEnvironment tableEnvironment = TableEnvironment.create(
EnvironmentSettings.newInstance()
.inBatchMode()
.useBlinkPlanner()
.build()
);
tableEnvironment.executeSql(createHoodieTableDDL(
tableConfig.getTableName(),
Lists.immutable.ofAll(schema.getFields())
.collect(field -> {
DataType type = AvroSchemaConverter.convertToDataType(field.schema());
return StrUtil.format("{} {}", field.name(), type.getLogicalType());
}),
tableConfig.getPartitionFields().map(Lists.immutable::of).orElse(Lists.immutable.empty()),
tableConfig.getRecordKeyFields().map(Lists.immutable::of).orElse(Lists.immutable.empty()),
Maps.immutable.of(
FlinkOptions.PATH.key(), metaClient.getBasePathV2().toString(),
FlinkOptions.READ_TASKS.key(), parallelism.toString()
)
));
return tableEnvironment.executeSql(SQL.apply(tableConfig.getTableName())).collect();
}
protected static String generateResult(CloseableIterator<Row> iterator, ImmutableList<String> fields) {
AsciiTable table = new AsciiTable();
table.addRule();
table.addRow(fields.toArray());
table.addRule();
iterator.forEachRemaining(row -> table.addRow(fields.collect(row::getField).toArray()));
table.addRule();
table.getRenderer().setCWC(new CWC_LongestLine());
table.getContext().setGridTheme(TA_GridThemes.NONE);
table.setPaddingLeftRight(1);
return table.render();
}
private static String createHoodieTableDDL(
String tableName,
ImmutableList<String> fields,
ImmutableList<String> primaryKeys,
ImmutableList<String> partitionKeys,
ImmutableMap<String, String> options
) {
StringBuilder builder = new StringBuilder();
builder.append("create table if not exists ")
.append(tableName)
.append("(\n");
for (String field : fields) {
builder.append(" ")
.append(field)
.append(",\n");
}
builder.append(" PRIMARY KEY(")
.append(primaryKeys.makeString(","))
.append(") NOT ENFORCED\n")
.append(")\n");
if (!partitionKeys.isEmpty()) {
String partitons = partitionKeys
.collect(key -> StrUtil.format("`{}`", key))
.makeString(",");
builder.append("PARTITIONED BY (")
.append(partitons)
.append(")\n");
}
builder.append("with ('connector' = 'hudi'");
options.forEachKeyValue((k, v) -> builder
.append(",\n")
.append(" '")
.append(k)
.append("' = '")
.append(v)
.append("'"));
builder.append("\n)");
return builder.toString();
}
}

View File

@@ -0,0 +1,46 @@
package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.eclipse.collections.api.factory.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 统计表总数
*
* @author lanyuanxiaoyao
* @date 2024-05-17
*/
public class TableSummary extends SQLExecutor {
private static final Logger logger = LoggerFactory.getLogger(TableSummary.class);
public static void main(String[] args) throws Exception {
TaskContext taskContext = ArgumentsHelper.getContext(args);
logger.info("Context: {}", taskContext);
ArgumentsHelper.checkMetadata(taskContext, "hdfs");
String hdfs = (String) taskContext.getMetadata().get("hdfs");
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setBasePath(hdfs)
.build();
int parallelism = HdfsHelper.baseScanParallelismPredict(HdfsHelper.basePaths(HdfsHelper.hdfsPaths(metaClient.getRawFs(), hdfs)));
try (CloseableIterator<Row> iterator = executeBatch(
metaClient,
parallelism,
tableName -> StrUtil.format("select count({}) as `count`, max({}) as latest_op_ts from {}", Constants.UNION_KEY_NAME, Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, tableName)
)) {
HdfsHelper.createResult(FileSystem.get(metaClient.getHadoopConf()), taskContext, generateResult(iterator, Lists.immutable.of("count", "latest_op_ts")));
}
}
}

View File

@@ -110,7 +110,7 @@ public class HdfsHelper {
public static void createResult(FileSystem fileSystem, TaskContext context, String result) throws IOException {
Path resultPath = new Path(context.getResultPath() + "/" + context.getTaskId() + "/task-result");
try (FSDataOutputStream outputStream = fileSystem.create(resultPath)) {
outputStream.writeUTF(result);
outputStream.writeBytes(result);
}
}
}

View File

@@ -0,0 +1,26 @@
package com.lanyuanxiaoyao.service.executor.task;
import de.vandermeer.asciitable.AsciiTable;
import de.vandermeer.asciitable.CWC_LongestLine;
import de.vandermeer.asciithemes.TA_GridThemes;
import de.vandermeer.asciithemes.a8.A8_Grids;
/**
* @author
* @date 2024-05-17
*/
public class ConsoleTableTest {
public static void main(String[] args) {
AsciiTable table = new AsciiTable();
table.addRule();
table.addRow("Count", "Latest Opts");
table.addRule();
table.addRow("20657434", "2024-07-07 10:50:54");
table.addRow("20657434", "2024-07-07 10:50:54");
table.addRule();
table.getRenderer().setCWC(new CWC_LongestLine());
table.getContext().setGridTheme(TA_GridThemes.FULL);
table.setPaddingLeftRight(1);
System.out.println(table.render());
}
}