diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java index 98a74d1..16bbfc7 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.executor.task; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.executor.core.TaskContext; import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; @@ -8,6 +9,7 @@ import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper; import java.io.IOException; import java.util.Map; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -49,7 +51,7 @@ public class DataScanner { throw new RuntimeException(StrUtil.format("HDFS {} is not exists", hdfs)); } - ImmutableList paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs))) + ImmutableList paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs))) .reject(status -> StrUtil.equals(".hoodie", status.getPath().getName())) .flatCollect(status -> { try { @@ -62,18 +64,32 @@ public class DataScanner { throw new RuntimeException(e); } }) - .collect(FileStatus::getPath) - .select(path -> (FSUtils.isLogFile(path) && scanLog) || (FSUtils.isDataFile(path) && scanData)) - .collect(Path::toString); + .collect(FileStatus::getPath); StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment(); - environment.setParallelism(Math.max(paths.size() / 5, 1)); - environment.fromCollection(paths.toList()) - .shuffle() - .flatMap(new ReadHudiFile()) - .map(RecordView::toString) + environment.setParallelism(20); + + DataStream source = null; + if (scanLog) { + ImmutableList logPaths = paths.select(FSUtils::isLogFile).collect(Path::toString); + source = environment.fromCollection(logPaths.toList()) + .flatMap(new ReadHudiFile()) + .setParallelism(Math.max(1, logPaths.size() / 20)); + } + if (scanData) { + ImmutableList dataPaths = paths.select(FSUtils::isDataFile).collect(Path::toString); + int parallelism = Math.max(1, dataPaths.size() / 2); + if (ObjectUtil.isNull(source)) { + source = environment.fromCollection(dataPaths.toList()).flatMap(new ReadHudiFile()).setParallelism(parallelism); + } else { + source = source.union(environment.fromCollection(dataPaths.toList()).flatMap(new ReadHudiFile()).setParallelism(parallelism)); + } + } + if (ObjectUtil.isNull(source)) { + throw new RuntimeException("Source cannot be null"); + } + source.map(RecordView::toString) .filter(line -> StrUtil.contains(line, key)) - .disableChaining() .sinkTo(FlinkHelper.createFileSink(taskContext)); environment.execute(StrUtil.format("Search {} in {}", key, hdfs)); }