feat(executor): 完成文件检索扫描功能

使用flink分布式扫描日志文件和数据文件,检索关键词
This commit is contained in:
2024-01-10 16:15:11 +08:00
parent b69512f728
commit be64de7fe8
10 changed files with 328 additions and 144 deletions

View File

@@ -69,6 +69,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.collections</groupId>

View File

@@ -3,14 +3,17 @@ package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
import com.lanyuanxiaoyao.service.executor.task.functions.ReadLogFile;
import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile;
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
import java.io.IOException;
import java.util.Map;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
@@ -22,8 +25,8 @@ import org.slf4j.LoggerFactory;
* @author lanyuanxiaoyao
* @date 2024-01-08
*/
public class AvroScanner {
private static final Logger logger = LoggerFactory.getLogger(AvroScanner.class);
public class DataScanner {
private static final Logger logger = LoggerFactory.getLogger(DataScanner.class);
public static void main(String[] args) throws Exception {
TaskContext taskContext = ArgumentsHelper.getContext(args);
@@ -34,6 +37,11 @@ public class AvroScanner {
String hdfs = (String) metadata.get("hdfs");
ArgumentsHelper.checkMetadata(taskContext, "key");
String key = (String) metadata.get("key");
Boolean scanLog = (Boolean) metadata.getOrDefault("scan_log", true);
Boolean scanData = (Boolean) metadata.getOrDefault("scan_data", false);
if (!scanLog && !scanData) {
throw new RuntimeException("Must choose mode scan_log or scan_data");
}
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(configuration);
@@ -43,13 +51,29 @@ public class AvroScanner {
ImmutableList<String> paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs)))
.reject(status -> StrUtil.equals(".hoodie", status.getPath().getName()))
.collect(status -> status.getPath().toString());
.flatCollect(status -> {
try {
if (status.isDirectory()) {
return Lists.immutable.of(fileSystem.listStatus(status.getPath()));
} else {
return Lists.immutable.of(status);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(FileStatus::getPath)
.select(path -> (FSUtils.isLogFile(path) && scanLog) || (FSUtils.isDataFile(path) && scanData))
.collect(Path::toString);
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
environment.setParallelism(20);
FlinkHelper.getAllLogFilePaths(environment.fromCollection(paths.toList()))
.flatMap(new ReadLogFile())
environment.setParallelism(Math.max(paths.size() / 5, 1));
environment.fromCollection(paths.toList())
.shuffle()
.flatMap(new ReadHudiFile())
.map(RecordView::toString)
.filter(line -> StrUtil.contains(line, key))
.disableChaining()
.sinkTo(FlinkHelper.createFileSink(taskContext));
environment.execute(StrUtil.format("Search {} in {}", key, hdfs));
}

View File

@@ -48,7 +48,7 @@ public class RecordView implements Serializable, Comparable<RecordView> {
@Override
public String toString() {
return StrUtil.format("\n{} {} {}\n{}", operation, timestamp, file, data);
return StrUtil.format("{} {} {} {}", operation, timestamp, file, data);
}
@Override

View File

@@ -13,26 +13,32 @@ import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.*;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.io.storage.HoodieParquetReader;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.org.apache.avro.util.Utf8;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 读取log文件
* 读取log/data文件
*
* @author lanyuanxiaoyao
* @date 2024-01-09
*/
public class ReadLogFile implements FlatMapFunction<String, RecordView> {
private RecordView parseData(String source, IndexedRecord record) {
public class ReadHudiFile implements FlatMapFunction<String, RecordView> {
private static final Logger logger = LoggerFactory.getLogger(ReadHudiFile.class);
private RecordView parseData(String source, RecordView.Operation operation, IndexedRecord record) {
Schema schema = record.getSchema();
StringBuilder builder = new StringBuilder();
for (Schema.Field field : schema.getFields()) {
@@ -53,19 +59,41 @@ public class ReadLogFile implements FlatMapFunction<String, RecordView> {
}
String data = builder.toString();
RecordView recordView = new RecordView(RecordView.Operation.UPSERT, data, timestamp, source);
RecordView recordView = new RecordView(operation, data, timestamp, source);
recordView.getAttributes().put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, latestOpTs);
return recordView;
}
@Override
public void flatMap(String logFilePath, Collector<RecordView> out) throws IOException {
public void flatMap(String value, Collector<RecordView> out) throws IOException {
Configuration readerConfiguration = new Configuration();
FileSystem readerFilesystem = FileSystem.get(readerConfiguration);
MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(readerFilesystem, new Path(logFilePath));
Path filePath = new Path(value);
if (FSUtils.isLogFile(filePath)) {
readLogFile(readerFilesystem, filePath, out);
} else if (FSUtils.isDataFile(filePath)) {
readDataFile(readerFilesystem, filePath, out);
} else {
logger.warn("Cannot read file format: {}", filePath);
}
}
private void readDataFile(FileSystem readerFilesystem, Path dataFilePath, Collector<RecordView> out) throws IOException {
HoodieParquetReader<IndexedRecord> reader = new HoodieParquetReader<>(readerFilesystem.getConf(), dataFilePath);
ClosableIterator<IndexedRecord> recordIterator = reader.getRecordIterator();
while (recordIterator.hasNext()) {
RecordView recordView = parseData(dataFilePath.toString(), RecordView.Operation.RESULT, recordIterator.next());
out.collect(recordView);
}
recordIterator.close();
reader.close();
}
private void readLogFile(FileSystem readerFilesystem, Path logFilePath, Collector<RecordView> out) throws IOException {
MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(readerFilesystem, logFilePath);
Schema schema = new AvroSchemaConverter().convert(Objects.requireNonNull(messageType));
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(new Path(logFilePath)), schema)) {
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(logFilePath), schema)) {
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
Map<HoodieLogBlock.HeaderMetadataType, String> logBlockHeader = block.getLogBlockHeader();
@@ -75,7 +103,7 @@ public class ReadLogFile implements FlatMapFunction<String, RecordView> {
HoodieAvroDataBlock avroDataBlock = (HoodieAvroDataBlock) block;
try (ClosableIterator<IndexedRecord> avroDataBlockRecordIterator = avroDataBlock.getRecordIterator()) {
while (avroDataBlockRecordIterator.hasNext()) {
RecordView recordView = parseData(logFilePath, avroDataBlockRecordIterator.next());
RecordView recordView = parseData(logFilePath.toString(), RecordView.Operation.UPSERT, avroDataBlockRecordIterator.next());
out.collect(recordView);
}
}
@@ -84,7 +112,7 @@ public class ReadLogFile implements FlatMapFunction<String, RecordView> {
HoodieParquetDataBlock parquetDataBlock = (HoodieParquetDataBlock) block;
try (ClosableIterator<IndexedRecord> parquetDataBlockRecordIterator = parquetDataBlock.getRecordIterator()) {
while (parquetDataBlockRecordIterator.hasNext()) {
RecordView recordView = parseData(logFilePath, parquetDataBlockRecordIterator.next());
RecordView recordView = parseData(logFilePath.toString(), RecordView.Operation.UPSERT, parquetDataBlockRecordIterator.next());
out.collect(recordView);
}
}
@@ -96,12 +124,12 @@ public class ReadLogFile implements FlatMapFunction<String, RecordView> {
String keys = Arrays.stream(deleteBlock.getRecordsToDelete())
.map(deleteRecord -> deleteRecord.getHoodieKey().toString())
.collect(Collectors.joining(" "));
out.collect(new RecordView(RecordView.Operation.DELETE, keys, instant, logFilePath));
out.collect(new RecordView(RecordView.Operation.DELETE, keys, instant, logFilePath.toString()));
break;
case COMMAND_BLOCK:
HoodieCommandBlock commandBlock = (HoodieCommandBlock) block;
Map<HoodieLogBlock.HeaderMetadataType, String> header = commandBlock.getLogBlockHeader();
out.collect(new RecordView(RecordView.Operation.ROLLBACK, header.get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME), instant, logFilePath));
out.collect(new RecordView(RecordView.Operation.ROLLBACK, header.get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME), instant, logFilePath.toString()));
break;
default:
break;

View File

@@ -43,31 +43,4 @@ public class FlinkHelper {
.withOutputFileConfig(new OutputFileConfig("task", ""))
.build();
}
public static DataStream<String> getAllFilePaths(DataStream<String> source) {
return source
.map(path -> {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(configuration);
FileStatus[] statuses = fileSystem.listStatus(new org.apache.hadoop.fs.Path(path));
String[] results = new String[statuses.length];
for (int index = 0; index < statuses.length; index++) {
results[index] = statuses[index].getPath().toString();
}
return results;
})
.name("Read files")
.flatMap(new FlatMapIterator<String[], String>() {
@Override
public Iterator<String> flatMap(String[] strings) {
return Arrays.asList(strings).iterator();
}
});
}
public static DataStream<String> getAllLogFilePaths(DataStream<String> source) {
return getAllFilePaths(source)
.filter(FSUtils::isLogFile)
.name("Filter log files");
}
}