refactor(executor-task): 优化文件扫描工具检索效率

扫描全部的log文件和最新的base文件
This commit is contained in:
2024-01-17 18:26:47 +08:00
parent c74b38588b
commit f137eaf4be
2 changed files with 108 additions and 9 deletions

View File

@@ -2,6 +2,7 @@ package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile;
@@ -9,6 +10,7 @@ import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
@@ -30,6 +32,36 @@ import org.slf4j.LoggerFactory;
public class DataScanner {
private static final Logger logger = LoggerFactory.getLogger(DataScanner.class);
private static ImmutableList<String> parsePaths(FileSystem fileSystem, ImmutableList<Path> paths) {
return paths
.asParallel(ExecutorProvider.EXECUTORS, 1)
.reject(path -> {
try {
return FSUtils.getFileSize(fileSystem, path) < 1;
} catch (IOException e) {
logger.error("Get file size error", e);
}
return true;
})
.groupBy(FSUtils::getFileIdFromFilePath)
.multiValuesView()
.collect(pathList -> pathList
.toSortedListBy(path -> {
String commitTime = FSUtils.getCommitTime(path.getName());
try {
return Long.valueOf(commitTime);
} catch (Throwable throwable) {
return 0L;
}
})
.getLastOptional())
.select(Optional::isPresent)
.collect(Optional::get)
.collect(Path::toString)
.toList()
.toImmutable();
}
public static void main(String[] args) throws Exception {
TaskContext taskContext = ArgumentsHelper.getContext(args);
logger.info("Context: {}", taskContext);
@@ -67,30 +99,54 @@ public class DataScanner {
.collect(FileStatus::getPath);
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
environment.setParallelism(20);
DataStream<RecordView> source = null;
int totalParallelism = 20;
if (scanLog) {
ImmutableList<String> logPaths = paths.select(FSUtils::isLogFile).collect(Path::toString);
source = environment.fromCollection(logPaths.toList())
int parallelism = Math.max(1, Math.min(logPaths.size() / 20, 100));
totalParallelism = Math.max(totalParallelism, parallelism);
source = environment
.fromCollection(logPaths.toList())
.name("Read log paths")
.flatMap(new ReadHudiFile())
.setParallelism(Math.max(1, logPaths.size() / 20));
.name("Read hudi file")
.setParallelism(parallelism);
}
if (scanData) {
ImmutableList<String> dataPaths = paths.select(FSUtils::isDataFile).collect(Path::toString);
int parallelism = Math.max(1, dataPaths.size() / 2);
ImmutableList<String> dataPaths = parsePaths(fileSystem, paths.select(FSUtils::isBaseFile));
int parallelism = Math.max(1, Math.min(dataPaths.size() / 2, 500));
totalParallelism = Math.max(totalParallelism, parallelism);
if (ObjectUtil.isNull(source)) {
source = environment.fromCollection(dataPaths.toList()).flatMap(new ReadHudiFile()).setParallelism(parallelism);
source = environment
.fromCollection(dataPaths.toList())
.name("Read base paths")
.flatMap(new ReadHudiFile())
.name("Read hudi file")
.setParallelism(parallelism);
} else {
source = source.union(environment.fromCollection(dataPaths.toList()).flatMap(new ReadHudiFile()).setParallelism(parallelism));
source = source.union(environment
.fromCollection(dataPaths.toList())
.name("Read base paths")
.flatMap(new ReadHudiFile())
.name("Read hudi file")
.setParallelism(parallelism));
}
}
if (ObjectUtil.isNull(source)) {
throw new RuntimeException("Source cannot be null");
}
source.map(RecordView::toString)
environment.setParallelism(Math.max(50, Math.min(totalParallelism / 2, 200)));
source
.map(RecordView::toString)
.name("Covert record to string")
.filter(line -> StrUtil.contains(line, key))
.sinkTo(FlinkHelper.createFileSink(taskContext));
.name("Filter target key")
.sinkTo(FlinkHelper.createFileSink(taskContext))
.setParallelism(10)
.name("Output results");
environment.execute(StrUtil.format("Search {} in {}", key, hdfs));
}
}

View File

@@ -0,0 +1,43 @@
package com.lanyuanxiaoyao.service.executor.task;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @author lanyuanxiaoyao
* @date 2024-01-17
*/
public class ParseLatestPath {
public static void main(String[] args) throws IOException {
List<Path> paths = Files.readAllLines(Paths.get("/Users/lanyuanxiaoyao/SynologyDrive/document/office/(2024-01-17-459) list.txt"))
.stream()
.map(Path::new)
.collect(Collectors.toList());
ImmutableList<String> targetPaths = Lists.immutable.ofAll(paths)
.select(FSUtils::isBaseFile)
.groupBy(FSUtils::getFileIdFromFilePath)
.multiValuesView()
.collect(pathList -> pathList
.toSortedListBy(path -> {
String commitTime = FSUtils.getCommitTime(path.getName());
System.out.println(commitTime);
return Long.valueOf(commitTime);
})
.getLastOptional())
.select(Optional::isPresent)
.collect(Optional::get)
.collect(Path::toString)
.toList()
.toImmutable();
targetPaths.forEach(System.out::println);
}
}