feat(executor-task): parquet文件读取增加选择列

指定列名可以提高检索速度,默认选择所有列
This commit is contained in:
v-zhangjc9
2024-05-12 17:41:09 +08:00
parent b51176e5c2
commit a1e0b20e87
9 changed files with 172 additions and 83 deletions

View File

@@ -9,13 +9,20 @@ import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSourc
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper;
import java.util.Arrays;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.eclipse.collections.api.factory.Sets;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.set.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +42,7 @@ public class DataScanner {
Map<String, Object> metadata = taskContext.getMetadata();
ArgumentsHelper.checkMetadata(taskContext, "key");
String key = (String) metadata.get("key");
String[] keys = key.split(",");
Boolean scanQueue = (Boolean) metadata.getOrDefault("scan_queue", false);
Boolean scanLog = (Boolean) metadata.getOrDefault("scan_log", false);
Boolean scanBase = (Boolean) metadata.getOrDefault("scan_base", false);
@@ -43,9 +51,23 @@ public class DataScanner {
throw new RuntimeException("Must choose mode scan_queue or scan_log or scan_data");
}
ImmutableSet<String> filterFields = Sets.immutable.empty();
String fieldText = (String) metadata.get("filter_fields");
if (StrUtil.isNotBlank(fieldText)) {
filterFields = Sets.immutable.of(fieldText.split(",")).collect(StrUtil::trim);
}
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
DataStream<RecordView> source = null;
DataStream<String> source = null;
BiFunction<DataStream<RecordView>, Integer, DataStream<String>> filterKeys = (stream, parallelism) -> stream
.map(RecordView::toString)
.setParallelism(parallelism)
.name("Covert record to string")
.filter(line -> StrUtil.containsAny(line, keys))
.setParallelism(parallelism)
.name("Filter target key");
Function<Integer, Integer> parallelismPredict = parallelism -> Math.max(50, Math.min(parallelism / 2, 200));
int totalParallelism = 30;
if (scanQueue) {
ArgumentsHelper.checkMetadata(taskContext, "pulsar");
@@ -53,10 +75,13 @@ public class DataScanner {
ArgumentsHelper.checkMetadata(taskContext, "pulsar_topic");
String pulsarTopic = (String) metadata.get("pulsar_topic");
logger.info("Scan queue topic: {} url: {}", pulsarTopic, pulsarUrl);
DataStream<RecordView> stream = environment
.fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic), WatermarkStrategy.noWatermarks(), "Read pulsar")
.setParallelism(totalParallelism)
.disableChaining();
DataStream<String> stream = filterKeys.apply(
environment
.fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic), WatermarkStrategy.noWatermarks(), "Read pulsar")
.setParallelism(totalParallelism)
.disableChaining(),
totalParallelism
);
if (ObjectUtil.isNull(source)) {
source = stream;
} else {
@@ -69,17 +94,24 @@ public class DataScanner {
FileSystem fileSystem = FileSystem.get(new Configuration());
HdfsHelper.checkHdfsPath(fileSystem, hdfs);
ImmutableList<FileStatus> paths = HdfsHelper.hdfsPaths(fileSystem, hdfs);
if (scanLog) {
logger.info("Scan log hdfs: {}", hdfs);
ImmutableList<String> logPaths = HdfsHelper.logPaths(fileSystem, hdfs);
ImmutableList<String> logPaths = HdfsHelper.logPaths(paths)
.collect(FileStatus::getPath)
.collect(Path::toString);
int parallelism = HdfsHelper.logScanParallelismPredict(logPaths);
totalParallelism = Math.max(totalParallelism, parallelism);
DataStream<RecordView> stream = environment
.fromCollection(logPaths.toList())
.name("Read log paths")
.flatMap(new ReadHudiFile())
.name("Read hudi file")
.setParallelism(parallelism);
DataStream<String> stream = filterKeys.apply(
environment
.fromCollection(logPaths.toList())
.name("Read log paths")
.flatMap(new ReadHudiFile(filterFields))
.name("Read log file")
.setParallelism(parallelism),
parallelismPredict.apply(totalParallelism)
);
if (ObjectUtil.isNull(source)) {
source = stream;
} else {
@@ -88,15 +120,18 @@ public class DataScanner {
}
if (scanBase) {
logger.info("Scan base hdfs: {}", hdfs);
ImmutableList<String> basePaths = HdfsHelper.latestBasePaths(fileSystem, hdfs);
ImmutableList<String> basePaths = HdfsHelper.latestBasePaths(paths);
int parallelism = HdfsHelper.baseScanParallelismPredict(basePaths);
totalParallelism = Math.max(totalParallelism, parallelism);
DataStream<RecordView> stream = environment
.fromCollection(basePaths.toList())
.name("Read base paths")
.flatMap(new ReadHudiFile())
.name("Read hudi file")
.setParallelism(parallelism);
DataStream<String> stream = filterKeys.apply(
environment
.fromCollection(basePaths.toList())
.name("Read base paths")
.flatMap(new ReadHudiFile(filterFields))
.name("Read base file")
.setParallelism(parallelism),
parallelismPredict.apply(totalParallelism)
);
if (ObjectUtil.isNull(source)) {
source = stream;
} else {
@@ -108,16 +143,10 @@ public class DataScanner {
throw new RuntimeException("Source cannot be null");
}
environment.setParallelism(Math.max(50, Math.min(totalParallelism / 2, 200)));
source
.map(RecordView::toString)
.name("Covert record to string")
.filter(line -> StrUtil.contains(line, key))
.name("Filter target key")
.sinkTo(FlinkHelper.createFileSink(taskContext))
.setParallelism(10)
.name("Output results");
environment.execute(StrUtil.format("Search {}", key));
environment.execute(StrUtil.format("Search {}", Arrays.toString(keys)));
}
}

View File

@@ -38,7 +38,7 @@ public class LatestOperationTimeScan {
FileSystem fileSystem = FileSystem.get(new Configuration());
HdfsHelper.checkHdfsPath(fileSystem, hdfs);
ImmutableList<String> basePaths = HdfsHelper.latestBasePaths(fileSystem, hdfs);
ImmutableList<String> basePaths = HdfsHelper.latestBasePaths(HdfsHelper.hdfsPaths(fileSystem, hdfs));
int parallelism = HdfsHelper.baseScanParallelismPredict(basePaths);

View File

@@ -7,6 +7,7 @@ import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
@@ -14,6 +15,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -22,11 +24,22 @@ import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.io.storage.HoodieParquetReader;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.org.apache.avro.util.Utf8;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Sets;
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.api.set.ImmutableSet;
import org.eclipse.collections.api.set.MutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +52,19 @@ import org.slf4j.LoggerFactory;
public class ReadHudiFile implements FlatMapFunction<String, RecordView> {
private static final Logger logger = LoggerFactory.getLogger(ReadHudiFile.class);
private final MutableSet<String> filterFields = Sets.mutable.of(
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME
);
public ReadHudiFile() {
this(Sets.immutable.empty());
}
public ReadHudiFile(ImmutableSet<String> filterFields) {
this.filterFields.addAll(filterFields.toSet());
}
private RecordView parseData(String source, RecordView.Operation operation, IndexedRecord record) {
Schema schema = record.getSchema();
StringBuilder builder = new StringBuilder();
@@ -56,12 +82,12 @@ public class ReadHudiFile implements FlatMapFunction<String, RecordView> {
}
String timestamp = null;
Schema.Field commitTimeField = schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
if (ObjectUtil.isNotNull(commitTimeField)) {
if (ObjectUtil.isNotNull(commitTimeField) || ObjectUtil.isNotNull(record.get(commitTimeField.pos()))) {
timestamp = ((Utf8) record.get(commitTimeField.pos())).toString();
}
String latestOpTs = null;
Schema.Field latestOpTsField = schema.getField(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME);
if (ObjectUtil.isNotNull(latestOpTsField)) {
if (ObjectUtil.isNotNull(latestOpTsField) || ObjectUtil.isNotNull(record.get(latestOpTsField.pos()))) {
latestOpTs = ((Utf8) record.get(latestOpTsField.pos())).toString();
}
@@ -80,26 +106,35 @@ public class ReadHudiFile implements FlatMapFunction<String, RecordView> {
FileSystem readerFilesystem = FileSystem.get(readerConfiguration);
Path filePath = new Path(value);
if (FSUtils.isLogFile(filePath)) {
readLogFile(readerFilesystem, filePath, out);
readLogFile(readerFilesystem, filePath, out, filterFields.toImmutable());
} else if (FSUtils.isDataFile(filePath)) {
readDataFile(readerFilesystem, filePath, out);
readDataFile(readerFilesystem, filePath, out, filterFields.toImmutable());
} else {
logger.warn("Cannot read file format: {}", filePath);
}
}
private void readDataFile(FileSystem readerFilesystem, Path dataFilePath, Collector<RecordView> out) throws IOException {
try(HoodieParquetReader<IndexedRecord> reader = new HoodieParquetReader<>(readerFilesystem.getConf(), dataFilePath)) {
try(ClosableIterator<IndexedRecord> recordIterator = reader.getRecordIterator()) {
while (recordIterator.hasNext()) {
RecordView recordView = parseData(dataFilePath.toString(), RecordView.Operation.RESULT, recordIterator.next());
out.collect(recordView);
}
private void readDataFile(FileSystem readerFilesystem, Path dataFilePath, Collector<RecordView> out, ImmutableSet<String> filterFields) throws IOException {
Configuration configuration = readerFilesystem.getConf();
ParquetUtils baseFileUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
Schema schema = baseFileUtils.readAvroSchema(configuration, dataFilePath);
MutableList<Schema.Field> fields = Lists.mutable.ofAll(schema.getFields())
.select(field -> filterFields.isEmpty() || filterFields.anySatisfy(name -> StrUtil.equals(name, field.name())))
.collect(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
.toList();
Schema readSchema = Schema.createRecord(schema.getName(), null, null, false, fields);
AvroReadSupport.setRequestedProjection(configuration, readSchema);
try (ParquetReader<GenericRecord> reader = AvroParquetReader.genericRecordReader(HadoopInputFile.fromPath(dataFilePath, configuration))) {
GenericRecord record;
while ((record = reader.read()) != null) {
RecordView recordView = parseData(dataFilePath.toString(), RecordView.Operation.RESULT, record);
out.collect(recordView);
}
}
}
private void readLogFile(FileSystem readerFilesystem, Path logFilePath, Collector<RecordView> out) throws IOException {
private void readLogFile(FileSystem readerFilesystem, Path logFilePath, Collector<RecordView> out, ImmutableSet<String> filterFields) throws IOException {
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(logFilePath), null)) {
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();

View File

@@ -6,12 +6,12 @@ import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Predicate;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
@@ -49,25 +49,14 @@ public class HdfsHelper {
}
public static Integer baseScanParallelismPredict(Integer pathNum) {
return Math.max(1, Math.min(pathNum / 2, 500));
return Math.max(1, Math.min(pathNum, 500));
}
public static ImmutableList<String> latestBasePaths(FileSystem fileSystem, String root) throws IOException {
return latestBasePaths(fileSystem, new Path(root));
}
public static ImmutableList<String> latestBasePaths(FileSystem fileSystem, Path root) throws IOException {
return basePaths(fileSystem, root)
public static ImmutableList<String> latestBasePaths(ImmutableList<FileStatus> paths) {
return paths
.asParallel(ExecutorProvider.EXECUTORS, 1)
.collect(Path::new)
.reject(path -> {
try {
return FSUtils.getFileSize(fileSystem, path) < 1;
} catch (IOException e) {
logger.error("Get file size error", e);
}
return true;
})
.reject(status -> status.getLen() < 1)
.collect(FileStatus::getPath)
.groupBy(FSUtils::getFileIdFromFilePath)
.multiValuesView()
.collect(pathList -> pathList
@@ -87,25 +76,24 @@ public class HdfsHelper {
.toImmutable();
}
public static ImmutableList<String> basePaths(FileSystem fileSystem, String root) throws IOException {
return basePaths(fileSystem, new Path(root));
public static ImmutableList<FileStatus> basePaths(ImmutableList<FileStatus> paths) throws IOException {
return paths.select(status -> FSUtils.isBaseFile(status.getPath()));
}
public static ImmutableList<String> basePaths(FileSystem fileSystem, Path root) throws IOException {
return hdfsPaths(fileSystem, root, FSUtils::isBaseFile);
public static ImmutableList<FileStatus> logPaths(ImmutableList<FileStatus> paths) {
return paths.select(status -> FSUtils.isLogFile(status.getPath()));
}
public static ImmutableList<String> logPaths(FileSystem fileSystem, String root) throws IOException {
return logPaths(fileSystem, new Path(root));
public static ImmutableList<FileStatus> hdfsPaths(FileSystem fileSystem, String root) throws IOException {
return hdfsPaths(fileSystem, new Path(root));
}
public static ImmutableList<String> logPaths(FileSystem fileSystem, Path root) throws IOException {
return hdfsPaths(fileSystem, root, FSUtils::isLogFile);
}
public static ImmutableList<String> hdfsPaths(FileSystem fileSystem, Path root, Predicate<Path> check) throws IOException {
/**
* 获取hdfs文件列表这个方法是专门给hudi表查询base或log文件使用所以只扫描一层
*/
public static ImmutableList<FileStatus> hdfsPaths(FileSystem fileSystem, Path root) throws IOException {
return Lists.immutable.of(fileSystem.listStatus(root))
.reject(status -> StrUtil.equals(".hoodie", status.getPath().getName()))
.reject(status -> StrUtil.equals(HoodieTableMetaClient.METAFOLDER_NAME, status.getPath().getName()))
.flatCollect(status -> {
try {
if (status.isDirectory()) {
@@ -116,15 +104,12 @@ public class HdfsHelper {
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(FileStatus::getPath)
.select(check::test)
.collect(Path::toString);
});
}
public static void createResult(FileSystem fileSystem, TaskContext context, String result) throws IOException {
Path resultPath = new Path(context.getResultPath() + "/" + context.getTaskId() + "/task-result");
try(FSDataOutputStream outputStream = fileSystem.create(resultPath)) {
try (FSDataOutputStream outputStream = fileSystem.create(resultPath)) {
outputStream.writeUTF(result);
}
}