refactor(executor-task): 优化日志文件和数据文件读取模式

This commit is contained in:
2024-01-11 10:09:57 +08:00
parent 8d6c637b37
commit 7841296b0c

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
@@ -8,6 +9,7 @@ import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
import java.io.IOException;
import java.util.Map;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -49,7 +51,7 @@ public class DataScanner {
throw new RuntimeException(StrUtil.format("HDFS {} is not exists", hdfs));
}
ImmutableList<String> paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs)))
ImmutableList<Path> paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs)))
.reject(status -> StrUtil.equals(".hoodie", status.getPath().getName()))
.flatCollect(status -> {
try {
@@ -62,18 +64,32 @@ public class DataScanner {
throw new RuntimeException(e);
}
})
.collect(FileStatus::getPath)
.select(path -> (FSUtils.isLogFile(path) && scanLog) || (FSUtils.isDataFile(path) && scanData))
.collect(Path::toString);
.collect(FileStatus::getPath);
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
environment.setParallelism(Math.max(paths.size() / 5, 1));
environment.fromCollection(paths.toList())
.shuffle()
.flatMap(new ReadHudiFile())
.map(RecordView::toString)
environment.setParallelism(20);
DataStream<RecordView> source = null;
if (scanLog) {
ImmutableList<String> logPaths = paths.select(FSUtils::isLogFile).collect(Path::toString);
source = environment.fromCollection(logPaths.toList())
.flatMap(new ReadHudiFile())
.setParallelism(Math.max(1, logPaths.size() / 20));
}
if (scanData) {
ImmutableList<String> dataPaths = paths.select(FSUtils::isDataFile).collect(Path::toString);
int parallelism = Math.max(1, dataPaths.size() / 2);
if (ObjectUtil.isNull(source)) {
source = environment.fromCollection(dataPaths.toList()).flatMap(new ReadHudiFile()).setParallelism(parallelism);
} else {
source = source.union(environment.fromCollection(dataPaths.toList()).flatMap(new ReadHudiFile()).setParallelism(parallelism));
}
}
if (ObjectUtil.isNull(source)) {
throw new RuntimeException("Source cannot be null");
}
source.map(RecordView::toString)
.filter(line -> StrUtil.contains(line, key))
.disableChaining()
.sinkTo(FlinkHelper.createFileSink(taskContext));
environment.execute(StrUtil.format("Search {} in {}", key, hdfs));
}