diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java index 9a21e27..ce9165b 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java @@ -36,9 +36,10 @@ public class ExecutorTaskController { @RequestParam(value = "scan_queue", defaultValue = "false") Boolean scanQueue, @RequestParam(value = "scan_log", defaultValue = "false") Boolean scanLog, @RequestParam(value = "scan_base", defaultValue = "false") Boolean scanBase, - @RequestParam(value = "scan_target", defaultValue = "false") Boolean scanTarget + @RequestParam(value = "scan_target", defaultValue = "false") Boolean scanTarget, + @RequestParam(value = "filter_fields", required = false) String filterFields ) throws Exception { - logger.info("Enter method: scan[key, hdfs, pulsar, pulsarTopic, scanSource, scanQueue, scanLog, scanBase, scanTarget]. " + "key:" + key + "," + "hdfs:" + hdfs + "," + "pulsar:" + pulsar + "," + "pulsarTopic:" + pulsarTopic + "," + "scanSource:" + scanSource + "," + "scanQueue:" + scanQueue + "," + "scanLog:" + scanLog + "," + "scanBase:" + scanBase + "," + "scanTarget:" + scanTarget); + logger.info("Enter method: scan[key, hdfs, pulsar, pulsarTopic, scanSource, scanQueue, scanLog, scanBase, scanTarget, filter_fields]. " + "key:" + key + "," + "hdfs:" + hdfs + "," + "pulsar:" + pulsar + "," + "pulsarTopic:" + pulsarTopic + "," + "scanSource:" + scanSource + "," + "scanQueue:" + scanQueue + "," + "scanLog:" + scanLog + "," + "scanBase:" + scanBase + "," + "scanTarget:" + scanTarget + "," + "filter_fields:" + filterFields); if (!scanSource && !scanQueue && !scanLog && !scanBase && !scanTarget) { throw new RuntimeException("Must choose one mode"); } @@ -48,7 +49,7 @@ public class ExecutorTaskController { if ((scanLog || scanBase) && StrUtil.isBlank(hdfs)) { throw new RuntimeException("Hdfs path cannot be empty"); } - return executorTaskService.scanAvro(key, hdfs, pulsar, pulsarTopic, scanSource, scanQueue, scanLog, scanBase, scanTarget); + return executorTaskService.scanAvro(key, hdfs, pulsar, pulsarTopic, scanSource, scanQueue, scanLog, scanBase, scanTarget, filterFields); } @GetMapping("latest_op_ts") diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java index cc0a2fb..a0a6741 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java @@ -21,7 +21,18 @@ import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.flink.client.cli.ClientOptions; -import org.apache.flink.configuration.*; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnDeploymentTarget; import org.apache.hadoop.fs.FSDataInputStream; @@ -141,10 +152,23 @@ public class ExecutorTaskService { Boolean scanQueue, Boolean scanLog, Boolean scanBase, - Boolean scanTarget + Boolean scanTarget, + String filterFields ) throws Exception { String taskId = taskId(); - Configuration configuration = generateConfiguration(taskId, "scan " + key); + + MutableList types = Lists.mutable.empty(); + if (scanSource) + types.add("source"); + if (scanQueue) + types.add("queue"); + if (scanLog) + types.add("log"); + if (scanBase) + types.add("base"); + if (scanTarget) + types.add("target"); + Configuration configuration = generateConfiguration(taskId, StrUtil.format("scan {} {}", types.makeString(","), key)); MapBuilder builder = MapUtil.builder(); setEnvironment(configuration, "key", key); @@ -164,6 +188,10 @@ public class ExecutorTaskService { builder.put("pulsar", pulsar); builder.put("pulsar_topic", pulsarTopic); } + + if (StrUtil.isNotBlank(filterFields)) { + builder.put("filter_fields", filterFields); + } ApplicationId applicationId = Runner.run( configuration, "com.lanyuanxiaoyao.service.executor.task.DataScanner", diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java index 561ded0..6a498d5 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java @@ -9,13 +9,20 @@ import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSourc import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper; import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper; +import java.util.Arrays; import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.eclipse.collections.api.factory.Sets; import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.set.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +42,7 @@ public class DataScanner { Map metadata = taskContext.getMetadata(); ArgumentsHelper.checkMetadata(taskContext, "key"); String key = (String) metadata.get("key"); + String[] keys = key.split(","); Boolean scanQueue = (Boolean) metadata.getOrDefault("scan_queue", false); Boolean scanLog = (Boolean) metadata.getOrDefault("scan_log", false); Boolean scanBase = (Boolean) metadata.getOrDefault("scan_base", false); @@ -43,9 +51,23 @@ public class DataScanner { throw new RuntimeException("Must choose mode scan_queue or scan_log or scan_data"); } + ImmutableSet filterFields = Sets.immutable.empty(); + String fieldText = (String) metadata.get("filter_fields"); + if (StrUtil.isNotBlank(fieldText)) { + filterFields = Sets.immutable.of(fieldText.split(",")).collect(StrUtil::trim); + } + StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment(); - DataStream source = null; + DataStream source = null; + BiFunction, Integer, DataStream> filterKeys = (stream, parallelism) -> stream + .map(RecordView::toString) + .setParallelism(parallelism) + .name("Covert record to string") + .filter(line -> StrUtil.containsAny(line, keys)) + .setParallelism(parallelism) + .name("Filter target key"); + Function parallelismPredict = parallelism -> Math.max(50, Math.min(parallelism / 2, 200)); int totalParallelism = 30; if (scanQueue) { ArgumentsHelper.checkMetadata(taskContext, "pulsar"); @@ -53,10 +75,13 @@ public class DataScanner { ArgumentsHelper.checkMetadata(taskContext, "pulsar_topic"); String pulsarTopic = (String) metadata.get("pulsar_topic"); logger.info("Scan queue topic: {} url: {}", pulsarTopic, pulsarUrl); - DataStream stream = environment - .fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic), WatermarkStrategy.noWatermarks(), "Read pulsar") - .setParallelism(totalParallelism) - .disableChaining(); + DataStream stream = filterKeys.apply( + environment + .fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic), WatermarkStrategy.noWatermarks(), "Read pulsar") + .setParallelism(totalParallelism) + .disableChaining(), + totalParallelism + ); if (ObjectUtil.isNull(source)) { source = stream; } else { @@ -69,17 +94,24 @@ public class DataScanner { FileSystem fileSystem = FileSystem.get(new Configuration()); HdfsHelper.checkHdfsPath(fileSystem, hdfs); + ImmutableList paths = HdfsHelper.hdfsPaths(fileSystem, hdfs); + if (scanLog) { logger.info("Scan log hdfs: {}", hdfs); - ImmutableList logPaths = HdfsHelper.logPaths(fileSystem, hdfs); + ImmutableList logPaths = HdfsHelper.logPaths(paths) + .collect(FileStatus::getPath) + .collect(Path::toString); int parallelism = HdfsHelper.logScanParallelismPredict(logPaths); totalParallelism = Math.max(totalParallelism, parallelism); - DataStream stream = environment - .fromCollection(logPaths.toList()) - .name("Read log paths") - .flatMap(new ReadHudiFile()) - .name("Read hudi file") - .setParallelism(parallelism); + DataStream stream = filterKeys.apply( + environment + .fromCollection(logPaths.toList()) + .name("Read log paths") + .flatMap(new ReadHudiFile(filterFields)) + .name("Read log file") + .setParallelism(parallelism), + parallelismPredict.apply(totalParallelism) + ); if (ObjectUtil.isNull(source)) { source = stream; } else { @@ -88,15 +120,18 @@ public class DataScanner { } if (scanBase) { logger.info("Scan base hdfs: {}", hdfs); - ImmutableList basePaths = HdfsHelper.latestBasePaths(fileSystem, hdfs); + ImmutableList basePaths = HdfsHelper.latestBasePaths(paths); int parallelism = HdfsHelper.baseScanParallelismPredict(basePaths); totalParallelism = Math.max(totalParallelism, parallelism); - DataStream stream = environment - .fromCollection(basePaths.toList()) - .name("Read base paths") - .flatMap(new ReadHudiFile()) - .name("Read hudi file") - .setParallelism(parallelism); + DataStream stream = filterKeys.apply( + environment + .fromCollection(basePaths.toList()) + .name("Read base paths") + .flatMap(new ReadHudiFile(filterFields)) + .name("Read base file") + .setParallelism(parallelism), + parallelismPredict.apply(totalParallelism) + ); if (ObjectUtil.isNull(source)) { source = stream; } else { @@ -108,16 +143,10 @@ public class DataScanner { throw new RuntimeException("Source cannot be null"); } - environment.setParallelism(Math.max(50, Math.min(totalParallelism / 2, 200))); - source - .map(RecordView::toString) - .name("Covert record to string") - .filter(line -> StrUtil.contains(line, key)) - .name("Filter target key") .sinkTo(FlinkHelper.createFileSink(taskContext)) .setParallelism(10) .name("Output results"); - environment.execute(StrUtil.format("Search {}", key)); + environment.execute(StrUtil.format("Search {}", Arrays.toString(keys))); } } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java index b306ba1..76f4b4f 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java @@ -38,7 +38,7 @@ public class LatestOperationTimeScan { FileSystem fileSystem = FileSystem.get(new Configuration()); HdfsHelper.checkHdfsPath(fileSystem, hdfs); - ImmutableList basePaths = HdfsHelper.latestBasePaths(fileSystem, hdfs); + ImmutableList basePaths = HdfsHelper.latestBasePaths(HdfsHelper.hdfsPaths(fileSystem, hdfs)); int parallelism = HdfsHelper.baseScanParallelismPredict(basePaths); diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java index 7f2867c..6d90d63 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java @@ -7,6 +7,7 @@ import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; import java.io.IOException; import java.util.Arrays; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; @@ -14,6 +15,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -22,11 +24,22 @@ import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.io.storage.HoodieParquetReader; +import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.org.apache.avro.Schema; +import org.apache.hudi.org.apache.avro.generic.GenericRecord; import org.apache.hudi.org.apache.avro.generic.IndexedRecord; import org.apache.hudi.org.apache.avro.util.Utf8; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.factory.Sets; +import org.eclipse.collections.api.list.MutableList; +import org.eclipse.collections.api.set.ImmutableSet; +import org.eclipse.collections.api.set.MutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +52,19 @@ import org.slf4j.LoggerFactory; public class ReadHudiFile implements FlatMapFunction { private static final Logger logger = LoggerFactory.getLogger(ReadHudiFile.class); + private final MutableSet filterFields = Sets.mutable.of( + HoodieRecord.COMMIT_TIME_METADATA_FIELD, + Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME + ); + + public ReadHudiFile() { + this(Sets.immutable.empty()); + } + + public ReadHudiFile(ImmutableSet filterFields) { + this.filterFields.addAll(filterFields.toSet()); + } + private RecordView parseData(String source, RecordView.Operation operation, IndexedRecord record) { Schema schema = record.getSchema(); StringBuilder builder = new StringBuilder(); @@ -56,12 +82,12 @@ public class ReadHudiFile implements FlatMapFunction { } String timestamp = null; Schema.Field commitTimeField = schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD); - if (ObjectUtil.isNotNull(commitTimeField)) { + if (ObjectUtil.isNotNull(commitTimeField) || ObjectUtil.isNotNull(record.get(commitTimeField.pos()))) { timestamp = ((Utf8) record.get(commitTimeField.pos())).toString(); } String latestOpTs = null; Schema.Field latestOpTsField = schema.getField(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME); - if (ObjectUtil.isNotNull(latestOpTsField)) { + if (ObjectUtil.isNotNull(latestOpTsField) || ObjectUtil.isNotNull(record.get(latestOpTsField.pos()))) { latestOpTs = ((Utf8) record.get(latestOpTsField.pos())).toString(); } @@ -80,26 +106,35 @@ public class ReadHudiFile implements FlatMapFunction { FileSystem readerFilesystem = FileSystem.get(readerConfiguration); Path filePath = new Path(value); if (FSUtils.isLogFile(filePath)) { - readLogFile(readerFilesystem, filePath, out); + readLogFile(readerFilesystem, filePath, out, filterFields.toImmutable()); } else if (FSUtils.isDataFile(filePath)) { - readDataFile(readerFilesystem, filePath, out); + readDataFile(readerFilesystem, filePath, out, filterFields.toImmutable()); } else { logger.warn("Cannot read file format: {}", filePath); } } - private void readDataFile(FileSystem readerFilesystem, Path dataFilePath, Collector out) throws IOException { - try(HoodieParquetReader reader = new HoodieParquetReader<>(readerFilesystem.getConf(), dataFilePath)) { - try(ClosableIterator recordIterator = reader.getRecordIterator()) { - while (recordIterator.hasNext()) { - RecordView recordView = parseData(dataFilePath.toString(), RecordView.Operation.RESULT, recordIterator.next()); - out.collect(recordView); - } + + private void readDataFile(FileSystem readerFilesystem, Path dataFilePath, Collector out, ImmutableSet filterFields) throws IOException { + Configuration configuration = readerFilesystem.getConf(); + ParquetUtils baseFileUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); + Schema schema = baseFileUtils.readAvroSchema(configuration, dataFilePath); + MutableList fields = Lists.mutable.ofAll(schema.getFields()) + .select(field -> filterFields.isEmpty() || filterFields.anySatisfy(name -> StrUtil.equals(name, field.name()))) + .collect(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())) + .toList(); + Schema readSchema = Schema.createRecord(schema.getName(), null, null, false, fields); + AvroReadSupport.setRequestedProjection(configuration, readSchema); + try (ParquetReader reader = AvroParquetReader.genericRecordReader(HadoopInputFile.fromPath(dataFilePath, configuration))) { + GenericRecord record; + while ((record = reader.read()) != null) { + RecordView recordView = parseData(dataFilePath.toString(), RecordView.Operation.RESULT, record); + out.collect(recordView); } } } - private void readLogFile(FileSystem readerFilesystem, Path logFilePath, Collector out) throws IOException { + private void readLogFile(FileSystem readerFilesystem, Path logFilePath, Collector out, ImmutableSet filterFields) throws IOException { try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(logFilePath), null)) { while (reader.hasNext()) { HoodieLogBlock block = reader.next(); diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java index 0df8e1c..0e88eed 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java @@ -6,12 +6,12 @@ import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.executor.core.TaskContext; import java.io.IOException; import java.util.Optional; -import java.util.function.Predicate; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; @@ -49,25 +49,14 @@ public class HdfsHelper { } public static Integer baseScanParallelismPredict(Integer pathNum) { - return Math.max(1, Math.min(pathNum / 2, 500)); + return Math.max(1, Math.min(pathNum, 500)); } - public static ImmutableList latestBasePaths(FileSystem fileSystem, String root) throws IOException { - return latestBasePaths(fileSystem, new Path(root)); - } - - public static ImmutableList latestBasePaths(FileSystem fileSystem, Path root) throws IOException { - return basePaths(fileSystem, root) + public static ImmutableList latestBasePaths(ImmutableList paths) { + return paths .asParallel(ExecutorProvider.EXECUTORS, 1) - .collect(Path::new) - .reject(path -> { - try { - return FSUtils.getFileSize(fileSystem, path) < 1; - } catch (IOException e) { - logger.error("Get file size error", e); - } - return true; - }) + .reject(status -> status.getLen() < 1) + .collect(FileStatus::getPath) .groupBy(FSUtils::getFileIdFromFilePath) .multiValuesView() .collect(pathList -> pathList @@ -87,25 +76,24 @@ public class HdfsHelper { .toImmutable(); } - public static ImmutableList basePaths(FileSystem fileSystem, String root) throws IOException { - return basePaths(fileSystem, new Path(root)); + public static ImmutableList basePaths(ImmutableList paths) throws IOException { + return paths.select(status -> FSUtils.isBaseFile(status.getPath())); } - public static ImmutableList basePaths(FileSystem fileSystem, Path root) throws IOException { - return hdfsPaths(fileSystem, root, FSUtils::isBaseFile); + public static ImmutableList logPaths(ImmutableList paths) { + return paths.select(status -> FSUtils.isLogFile(status.getPath())); } - public static ImmutableList logPaths(FileSystem fileSystem, String root) throws IOException { - return logPaths(fileSystem, new Path(root)); + public static ImmutableList hdfsPaths(FileSystem fileSystem, String root) throws IOException { + return hdfsPaths(fileSystem, new Path(root)); } - public static ImmutableList logPaths(FileSystem fileSystem, Path root) throws IOException { - return hdfsPaths(fileSystem, root, FSUtils::isLogFile); - } - - public static ImmutableList hdfsPaths(FileSystem fileSystem, Path root, Predicate check) throws IOException { + /** + * 获取hdfs文件列表,这个方法是专门给hudi表查询base或log文件使用,所以只扫描一层 + */ + public static ImmutableList hdfsPaths(FileSystem fileSystem, Path root) throws IOException { return Lists.immutable.of(fileSystem.listStatus(root)) - .reject(status -> StrUtil.equals(".hoodie", status.getPath().getName())) + .reject(status -> StrUtil.equals(HoodieTableMetaClient.METAFOLDER_NAME, status.getPath().getName())) .flatCollect(status -> { try { if (status.isDirectory()) { @@ -116,15 +104,12 @@ public class HdfsHelper { } catch (IOException e) { throw new RuntimeException(e); } - }) - .collect(FileStatus::getPath) - .select(check::test) - .collect(Path::toString); + }); } public static void createResult(FileSystem fileSystem, TaskContext context, String result) throws IOException { Path resultPath = new Path(context.getResultPath() + "/" + context.getTaskId() + "/task-result"); - try(FSDataOutputStream outputStream = fileSystem.create(resultPath)) { + try (FSDataOutputStream outputStream = fileSystem.create(resultPath)) { outputStream.writeUTF(result); } } diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java index 5d8dd03..50ad930 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java @@ -21,7 +21,8 @@ public interface TaskService { @Query("pulsar_topic") String pulsarTopic, @Query("scan_queue") Boolean scanQueue, @Query("scan_log") Boolean scanLog, - @Query("scan_base") Boolean scanBase + @Query("scan_base") Boolean scanBase, + @Query("filter_fields") String filterFields ); @Get(value = "/task/latest_op_ts", readTimeout = 2 * 60 * 1000) diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java index e6296d2..5309170 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java @@ -39,7 +39,8 @@ public class TaskController { @RequestParam(value = "hdfs", required = false) String hdfs, @RequestParam(value = "pulsar", required = false) String pulsar, @RequestParam(value = "topic", required = false) String topic, - @RequestParam(value = "mode", defaultValue = "") String mode + @RequestParam(value = "mode", defaultValue = "") String mode, + @RequestParam(value = "fields", required = false) String fields ) { if (StrUtil.isBlank(key)) { throw new RuntimeException("Key cannot be blank"); @@ -60,7 +61,7 @@ public class TaskController { throw new RuntimeException("Hdfs path cannot be empty"); } ExecutorProvider.EXECUTORS.submit(() -> { - String applicationId = taskService.scan(key, hdfs, pulsar, topic, scanQueue, scanLog, scanBase); + String applicationId = taskService.scan(key, hdfs, pulsar, topic, scanQueue, scanLog, scanBase, fields); logger.info("Task: {}", applicationId); }); return AmisResponse.responseSuccess(); diff --git a/service-web/src/main/resources/static/components/task-tab.js b/service-web/src/main/resources/static/components/task-tab.js index 9918871..162f02f 100644 --- a/service-web/src/main/resources/static/components/task-tab.js +++ b/service-web/src/main/resources/static/components/task-tab.js @@ -20,6 +20,7 @@ function taskTab() { pulsar: '${pulsar|default:undefined}', topic: '${topic|default:undefined}', mode: '${scan_mode|default:undefined}', + fields: '${fields|default:undefined}', } } }, @@ -56,6 +57,14 @@ function taskTab() { description: '输入表HDFS路径', autoComplete: '${base}/table/all_hdfs?key=$term', }, + { + type: 'input-text', + name: 'fields', + label: '指定字段', + visibleOn: '${CONTAINS(scan_mode, \'base\')}', + clearable: true, + description: '逗号分隔,可以大幅提高parquet文件检索速度,但无法获取指定字段外的字段内容', + }, { type: 'group', body: [