diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java index 16bbfc7..0b53645 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java @@ -2,6 +2,7 @@ package com.lanyuanxiaoyao.service.executor.task; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.executor.core.TaskContext; import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile; @@ -9,6 +10,7 @@ import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper; import java.io.IOException; import java.util.Map; +import java.util.Optional; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.hadoop.conf.Configuration; @@ -30,6 +32,36 @@ import org.slf4j.LoggerFactory; public class DataScanner { private static final Logger logger = LoggerFactory.getLogger(DataScanner.class); + private static ImmutableList parsePaths(FileSystem fileSystem, ImmutableList paths) { + return paths + .asParallel(ExecutorProvider.EXECUTORS, 1) + .reject(path -> { + try { + return FSUtils.getFileSize(fileSystem, path) < 1; + } catch (IOException e) { + logger.error("Get file size error", e); + } + return true; + }) + .groupBy(FSUtils::getFileIdFromFilePath) + .multiValuesView() + .collect(pathList -> pathList + .toSortedListBy(path -> { + String commitTime = FSUtils.getCommitTime(path.getName()); + try { + return Long.valueOf(commitTime); + } catch (Throwable throwable) { + return 0L; + } + }) + .getLastOptional()) + .select(Optional::isPresent) + .collect(Optional::get) + .collect(Path::toString) + .toList() + .toImmutable(); + } + public static void main(String[] args) throws Exception { TaskContext taskContext = ArgumentsHelper.getContext(args); logger.info("Context: {}", taskContext); @@ -67,30 +99,54 @@ public class DataScanner { .collect(FileStatus::getPath); StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment(); - environment.setParallelism(20); DataStream source = null; + int totalParallelism = 20; if (scanLog) { ImmutableList logPaths = paths.select(FSUtils::isLogFile).collect(Path::toString); - source = environment.fromCollection(logPaths.toList()) + int parallelism = Math.max(1, Math.min(logPaths.size() / 20, 100)); + totalParallelism = Math.max(totalParallelism, parallelism); + source = environment + .fromCollection(logPaths.toList()) + .name("Read log paths") .flatMap(new ReadHudiFile()) - .setParallelism(Math.max(1, logPaths.size() / 20)); + .name("Read hudi file") + .setParallelism(parallelism); } if (scanData) { - ImmutableList dataPaths = paths.select(FSUtils::isDataFile).collect(Path::toString); - int parallelism = Math.max(1, dataPaths.size() / 2); + ImmutableList dataPaths = parsePaths(fileSystem, paths.select(FSUtils::isBaseFile)); + int parallelism = Math.max(1, Math.min(dataPaths.size() / 2, 500)); + totalParallelism = Math.max(totalParallelism, parallelism); if (ObjectUtil.isNull(source)) { - source = environment.fromCollection(dataPaths.toList()).flatMap(new ReadHudiFile()).setParallelism(parallelism); + source = environment + .fromCollection(dataPaths.toList()) + .name("Read base paths") + .flatMap(new ReadHudiFile()) + .name("Read hudi file") + .setParallelism(parallelism); } else { - source = source.union(environment.fromCollection(dataPaths.toList()).flatMap(new ReadHudiFile()).setParallelism(parallelism)); + source = source.union(environment + .fromCollection(dataPaths.toList()) + .name("Read base paths") + .flatMap(new ReadHudiFile()) + .name("Read hudi file") + .setParallelism(parallelism)); } } if (ObjectUtil.isNull(source)) { throw new RuntimeException("Source cannot be null"); } - source.map(RecordView::toString) + + environment.setParallelism(Math.max(50, Math.min(totalParallelism / 2, 200))); + + source + .map(RecordView::toString) + .name("Covert record to string") .filter(line -> StrUtil.contains(line, key)) - .sinkTo(FlinkHelper.createFileSink(taskContext)); + .name("Filter target key") + .sinkTo(FlinkHelper.createFileSink(taskContext)) + .setParallelism(10) + .name("Output results"); environment.execute(StrUtil.format("Search {} in {}", key, hdfs)); } } diff --git a/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ParseLatestPath.java b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ParseLatestPath.java new file mode 100644 index 0000000..7699eba --- /dev/null +++ b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ParseLatestPath.java @@ -0,0 +1,43 @@ +package com.lanyuanxiaoyao.service.executor.task; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.fs.FSUtils; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @author lanyuanxiaoyao + * @date 2024-01-17 + */ +public class ParseLatestPath { + public static void main(String[] args) throws IOException { + List paths = Files.readAllLines(Paths.get("/Users/lanyuanxiaoyao/SynologyDrive/document/office/(2024-01-17-459) list.txt")) + .stream() + .map(Path::new) + .collect(Collectors.toList()); + ImmutableList targetPaths = Lists.immutable.ofAll(paths) + .select(FSUtils::isBaseFile) + .groupBy(FSUtils::getFileIdFromFilePath) + .multiValuesView() + .collect(pathList -> pathList + .toSortedListBy(path -> { + String commitTime = FSUtils.getCommitTime(path.getName()); + System.out.println(commitTime); + return Long.valueOf(commitTime); + }) + .getLastOptional()) + .select(Optional::isPresent) + .collect(Optional::get) + .collect(Path::toString) + .toList() + .toImmutable(); + targetPaths.forEach(System.out::println); + } +}