diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/TableInfoSearchCache.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/TableInfoSearchCache.java index 29ebb89..0f3c1c1 100644 --- a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/TableInfoSearchCache.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/TableInfoSearchCache.java @@ -10,14 +10,18 @@ public class TableInfoSearchCache { private Long flinkJobId; private String alias; private String hdfs; + private String pulsar; + private String topic; public TableInfoSearchCache() { } - public TableInfoSearchCache(Long flinkJobId, String alias, String hdfs) { + public TableInfoSearchCache(Long flinkJobId, String alias, String hdfs, String pulsar, String topic) { this.flinkJobId = flinkJobId; this.alias = alias; this.hdfs = hdfs; + this.pulsar = pulsar; + this.topic = topic; } public Long getFlinkJobId() { @@ -44,12 +48,30 @@ public class TableInfoSearchCache { this.hdfs = hdfs; } + public String getPulsar() { + return pulsar; + } + + public void setPulsar(String pulsar) { + this.pulsar = pulsar; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + @Override public String toString() { return "TableInfoSearchCache{" + - "flinkJobId=" + flinkJobId + - ", alias='" + alias + '\'' + - ", hdfs='" + hdfs + '\'' + - '}'; + "flinkJobId=" + flinkJobId + + ", alias='" + alias + '\'' + + ", hdfs='" + hdfs + '\'' + + ", pulsar='" + pulsar + '\'' + + ", topic='" + topic + '\'' + + '}'; } } diff --git a/service-executor/service-executor-manager/pom.xml b/service-executor/service-executor-manager/pom.xml index 7b1e984..e41090d 100644 --- a/service-executor/service-executor-manager/pom.xml +++ b/service-executor/service-executor-manager/pom.xml @@ -89,6 +89,11 @@ + + org.apache.pulsar + pulsar-client + 2.8.0 + diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/TaskController.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/TaskController.java index 3acdba1..417989e 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/TaskController.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/TaskController.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.executor.manager.controller; +import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.executor.manager.service.TaskService; import java.io.IOException; import org.eclipse.collections.api.list.ImmutableList; @@ -27,14 +28,27 @@ public class TaskController { @GetMapping("scan") public String scan( - @RequestParam("hdfs") String hdfs, @RequestParam("key") String key, - @RequestParam(value = "scan_log", defaultValue = "true") Boolean scanLog, - @RequestParam(value = "scan_data", defaultValue = "false") Boolean scanData, + @RequestParam(value = "hdfs", required = false) String hdfs, + @RequestParam(value = "pulsar", required = false) String pulsar, + @RequestParam(value = "pulsar_topic", required = false) String pulsarTopic, @RequestParam(value = "scan_source", defaultValue = "false") Boolean scanSource, + @RequestParam(value = "scan_queue", defaultValue = "false") Boolean scanQueue, + @RequestParam(value = "scan_log", defaultValue = "false") Boolean scanLog, + @RequestParam(value = "scan_base", defaultValue = "false") Boolean scanBase, @RequestParam(value = "scan_target", defaultValue = "false") Boolean scanTarget ) throws Exception { - return taskService.scanAvro(hdfs, key, scanLog, scanData, scanSource, scanTarget); + logger.info("Enter method: scan[key, hdfs, pulsar, pulsarTopic, scanSource, scanQueue, scanLog, scanBase, scanTarget]. " + "key:" + key + "," + "hdfs:" + hdfs + "," + "pulsar:" + pulsar + "," + "pulsarTopic:" + pulsarTopic + "," + "scanSource:" + scanSource + "," + "scanQueue:" + scanQueue + "," + "scanLog:" + scanLog + "," + "scanBase:" + scanBase + "," + "scanTarget:" + scanTarget); + if (!scanSource && !scanQueue && !scanLog && !scanBase && !scanTarget) { + throw new RuntimeException("Must choose one mode"); + } + if (scanQueue && (StrUtil.isBlank(pulsar) || StrUtil.isBlank(pulsar))) { + throw new RuntimeException("Pulsar topic or url cannot be empty"); + } + if ((scanLog || scanBase) && StrUtil.isBlank(hdfs)) { + throw new RuntimeException("Hdfs path cannot be empty"); + } + return taskService.scanAvro(key, hdfs, pulsar, pulsarTopic, scanSource, scanQueue, scanLog, scanBase, scanTarget); } @GetMapping("results") diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java index 455716e..bca43d7 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java @@ -1,6 +1,8 @@ package com.lanyuanxiaoyao.service.executor.manager.service; import cn.hutool.core.io.IoUtil; +import cn.hutool.core.map.MapBuilder; +import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.utils.executor.Runner; @@ -108,11 +110,38 @@ public class TaskService { return configuration; } - public String scanAvro(String hdfs, String key, Boolean scanLog, Boolean scanData, Boolean scanSource, Boolean scanTarget) throws Exception { + public String scanAvro( + String key, + String hdfs, + String pulsar, + String pulsarTopic, + Boolean scanSource, + Boolean scanQueue, + Boolean scanLog, + Boolean scanBase, + Boolean scanTarget + ) throws Exception { String taskId = taskId(); Configuration configuration = generateConfiguration(taskId, "scan"); - setEnvironment(configuration, "hdfs", hdfs); + MapBuilder builder = MapUtil.builder(); + setEnvironment(configuration, "key", key); + builder.put("key", key); + + if (scanLog || scanBase) { + setEnvironment(configuration, "hdfs", hdfs); + builder.put("scan_log", scanLog); + builder.put("scan_base", scanBase); + builder.put("hdfs", hdfs); + } + + if (scanQueue) { + setEnvironment(configuration, "pulsar", pulsar); + setEnvironment(configuration, "pulsar_topic", pulsarTopic); + builder.put("scan_queue", true); + builder.put("pulsar", pulsar); + builder.put("pulsar_topic", pulsarTopic); + } ApplicationId applicationId = Runner.run( configuration, "com.lanyuanxiaoyao.service.executor.task.DataScanner", @@ -122,16 +151,7 @@ public class TaskService { new TaskContext( taskId, executorConfiguration.getTaskResultPath(), - Maps.mutable.of( - "key", - key, - "hdfs", - hdfs, - "scan_log", - scanLog, - "scan_data", - scanData - ) + Maps.mutable.ofMap(builder.build()) ) ) } diff --git a/service-executor/service-executor-manager/src/main/resources/application.yml b/service-executor/service-executor-manager/src/main/resources/application.yml index 4518441..c388a86 100644 --- a/service-executor/service-executor-manager/src/main/resources/application.yml +++ b/service-executor/service-executor-manager/src/main/resources/application.yml @@ -2,7 +2,7 @@ spring: application: name: service-executor-manager profiles: - include: random-port,common,discovery,metrics + include: random-port,common,discovery,metrics,forest executor: staging-directory: hdfs://b2/apps/datalake/yarn history-server-archive-dir: hdfs://b2/apps/flink/completed-jobs/ diff --git a/service-executor/service-executor-task/pom.xml b/service-executor/service-executor-task/pom.xml index 408b07a..d0500ea 100644 --- a/service-executor/service-executor-task/pom.xml +++ b/service-executor/service-executor-task/pom.xml @@ -77,6 +77,12 @@ 10.4.0 provided + + org.apache.pulsar + pulsar-client + 2.8.0 + provided + diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java index 0b53645..987abd3 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java @@ -6,11 +6,13 @@ import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.executor.core.TaskContext; import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile; +import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSource; import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper; import java.io.IOException; import java.util.Map; import java.util.Optional; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.hadoop.conf.Configuration; @@ -67,70 +69,92 @@ public class DataScanner { logger.info("Context: {}", taskContext); Map metadata = taskContext.getMetadata(); - ArgumentsHelper.checkMetadata(taskContext, "hdfs"); - String hdfs = (String) metadata.get("hdfs"); ArgumentsHelper.checkMetadata(taskContext, "key"); String key = (String) metadata.get("key"); - Boolean scanLog = (Boolean) metadata.getOrDefault("scan_log", true); - Boolean scanData = (Boolean) metadata.getOrDefault("scan_data", false); - if (!scanLog && !scanData) { - throw new RuntimeException("Must choose mode scan_log or scan_data"); - } + Boolean scanQueue = (Boolean) metadata.getOrDefault("scan_queue", false); + Boolean scanLog = (Boolean) metadata.getOrDefault("scan_log", false); + Boolean scanBase = (Boolean) metadata.getOrDefault("scan_base", false); - Configuration configuration = new Configuration(); - FileSystem fileSystem = FileSystem.get(configuration); - if (!fileSystem.exists(new Path(hdfs))) { - throw new RuntimeException(StrUtil.format("HDFS {} is not exists", hdfs)); + if (!scanQueue && !scanLog && !scanBase) { + throw new RuntimeException("Must choose mode scan_queue or scan_log or scan_data"); } - ImmutableList paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs))) - .reject(status -> StrUtil.equals(".hoodie", status.getPath().getName())) - .flatCollect(status -> { - try { - if (status.isDirectory()) { - return Lists.immutable.of(fileSystem.listStatus(status.getPath())); - } else { - return Lists.immutable.of(status); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .collect(FileStatus::getPath); - StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment(); DataStream source = null; int totalParallelism = 20; - if (scanLog) { - ImmutableList logPaths = paths.select(FSUtils::isLogFile).collect(Path::toString); - int parallelism = Math.max(1, Math.min(logPaths.size() / 20, 100)); - totalParallelism = Math.max(totalParallelism, parallelism); - source = environment - .fromCollection(logPaths.toList()) - .name("Read log paths") - .flatMap(new ReadHudiFile()) - .name("Read hudi file") - .setParallelism(parallelism); - } - if (scanData) { - ImmutableList dataPaths = parsePaths(fileSystem, paths.select(FSUtils::isBaseFile)); - int parallelism = Math.max(1, Math.min(dataPaths.size() / 2, 500)); - totalParallelism = Math.max(totalParallelism, parallelism); + if (scanQueue) { + ArgumentsHelper.checkMetadata(taskContext, "pulsar"); + String pulsarUrl = (String) metadata.get("pulsar"); + ArgumentsHelper.checkMetadata(taskContext, "pulsar_topic"); + String pulsarTopic = (String) metadata.get("pulsar_topic"); + logger.info("Scan queue topic: {} url: {}", pulsarTopic, pulsarUrl); + DataStream stream = environment + .fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic, 50), WatermarkStrategy.noWatermarks(), "Read pulsar") + .setParallelism(50) + .disableChaining(); if (ObjectUtil.isNull(source)) { - source = environment + source = stream; + } else { + source = source.union(stream); + } + } + if (scanLog || scanBase) { + ArgumentsHelper.checkMetadata(taskContext, "hdfs"); + String hdfs = (String) metadata.get("hdfs"); + Configuration configuration = new Configuration(); + FileSystem fileSystem = FileSystem.get(configuration); + if (!fileSystem.exists(new Path(hdfs))) { + throw new RuntimeException(StrUtil.format("HDFS {} is not exists", hdfs)); + } + + ImmutableList paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs))) + .reject(status -> StrUtil.equals(".hoodie", status.getPath().getName())) + .flatCollect(status -> { + try { + if (status.isDirectory()) { + return Lists.immutable.of(fileSystem.listStatus(status.getPath())); + } else { + return Lists.immutable.of(status); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(FileStatus::getPath); + if (scanLog) { + logger.info("Scan log hdfs: {}", hdfs); + ImmutableList logPaths = paths.select(FSUtils::isLogFile).collect(Path::toString); + int parallelism = Math.max(1, Math.min(logPaths.size() / 20, 100)); + totalParallelism = Math.max(totalParallelism, parallelism); + DataStream stream = environment + .fromCollection(logPaths.toList()) + .name("Read log paths") + .flatMap(new ReadHudiFile()) + .name("Read hudi file") + .setParallelism(parallelism); + if (ObjectUtil.isNull(source)) { + source = stream; + } else { + source = source.union(stream); + } + } + if (scanBase) { + logger.info("Scan base hdfs: {}", hdfs); + ImmutableList dataPaths = parsePaths(fileSystem, paths.select(FSUtils::isBaseFile)); + int parallelism = Math.max(1, Math.min(dataPaths.size() / 2, 500)); + totalParallelism = Math.max(totalParallelism, parallelism); + DataStream stream = environment .fromCollection(dataPaths.toList()) .name("Read base paths") .flatMap(new ReadHudiFile()) .name("Read hudi file") .setParallelism(parallelism); - } else { - source = source.union(environment - .fromCollection(dataPaths.toList()) - .name("Read base paths") - .flatMap(new ReadHudiFile()) - .name("Read hudi file") - .setParallelism(parallelism)); + if (ObjectUtil.isNull(source)) { + source = stream; + } else { + source = source.union(stream); + } } } if (ObjectUtil.isNull(source)) { @@ -147,6 +171,6 @@ public class DataScanner { .sinkTo(FlinkHelper.createFileSink(taskContext)) .setParallelism(10) .name("Output results"); - environment.execute(StrUtil.format("Search {} in {}", key, hdfs)); + environment.execute(); } } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java index c0758af..d5553fd 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java @@ -12,11 +12,13 @@ import java.util.Map; * @date 2024-01-09 */ public class RecordView implements Serializable, Comparable { - private final Operation operation; - private final String data; - private final String timestamp; - private final String file; - private final Map attributes; + private Operation operation; + private String data; + private String timestamp; + private String file; + private Map attributes; + + public RecordView() {} public RecordView(Operation operation, String data, String timestamp, String file) { this.operation = operation; @@ -30,22 +32,42 @@ public class RecordView implements Serializable, Comparable { return operation; } + public void setOperation(Operation operation) { + this.operation = operation; + } + public String getData() { return data; } + public void setData(String data) { + this.data = data; + } + public String getTimestamp() { return timestamp; } + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + public String getFile() { return file; } + public void setFile(String file) { + this.file = file; + } + public Map getAttributes() { return attributes; } + public void setAttributes(Map attributes) { + this.attributes = attributes; + } + @Override public String toString() { return StrUtil.format("{} {} {} {}", operation, timestamp, file, data); diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java new file mode 100644 index 0000000..b228ccd --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java @@ -0,0 +1,110 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.pulsar; + +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; +import java.io.Serializable; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.*; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.pulsar.client.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author lanyuanxiaoyao + * @date 2024-01-18 + */ +public class ReadPulsarSource implements Source>, ResultTypeQueryable, Serializable { + private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSource.class); + private static final Long TASK_GAP = 6 * 60 * 60 * 1000L; + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withLocale(Locale.CHINA) + .withZone(ZoneId.systemDefault()); + private final Collection splits; + + public ReadPulsarSource(TaskContext taskContext, String pulsarUrl, String pulsarTopic, Integer parallelism) throws PulsarClientException { + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarUrl) + .build()) { + try (Consumer consumer = client.newConsumer() + .topic(pulsarTopic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionMode(SubscriptionMode.NonDurable) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(StrUtil.format("Task_Reader_Detect_{}", taskContext.getTaskId())) + .startMessageIdInclusive() + .subscribe()) { + MessageId latestMessageId = consumer.getLastMessageId(); + Message message = consumer.receive(); + long startTimestamp = message.getPublishTime(); + long endTimestamp = Instant.now().toEpochMilli(); + long gap = Math.max((endTimestamp - startTimestamp) / (parallelism - 1), 1000 * 60 * 60); + logger.info("Gap: {}, Parallelism: {}", gap, parallelism); + List tasks = new ArrayList<>(); + while (startTimestamp < endTimestamp) { + tasks.add(new ReadPulsarSplit( + taskContext.getTaskId(), + pulsarUrl, + pulsarTopic, + latestMessageId.toString(), + startTimestamp, + startTimestamp + gap + )); + startTimestamp += gap; + } + splits = tasks; + for (ReadPulsarSplit split : splits) { + logger.info("Read split: {} -> {}", covertTimestamp(split.getStartTimestamp()), covertTimestamp(split.getEndTimestamp())); + } + } + } + } + + private static String covertTimestamp(Long timestamp) { + return FORMATTER.format(Instant.ofEpochMilli(timestamp)); + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) throws PulsarClientException { + return new ReadPulsarSourceReader(readerContext); + } + + @Override + public SplitEnumerator> createEnumerator(SplitEnumeratorContext enumContext) throws Exception { + return new ReadPulsarSourceEnumerator(enumContext, splits); + } + + @Override + public SplitEnumerator> restoreEnumerator(SplitEnumeratorContext enumContext, Collection checkpoint) throws Exception { + return new ReadPulsarSourceEnumerator(enumContext, checkpoint); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new ReadPulsarVersionedSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer> getEnumeratorCheckpointSerializer() { + return new ReadPulsarVersionedCheckpointSerializer(); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(RecordView.class); + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java new file mode 100644 index 0000000..63b958f --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java @@ -0,0 +1,63 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.pulsar; + +import cn.hutool.core.util.ObjectUtil; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import javax.annotation.Nullable; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author lanyuanxiaoyao + * @date 2024-01-18 + */ +public class ReadPulsarSourceEnumerator implements SplitEnumerator>, Serializable { + private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSourceEnumerator.class); + private final SplitEnumeratorContext context; + private final Queue readQueue; + + public ReadPulsarSourceEnumerator(SplitEnumeratorContext context, Collection splits) { + this.context = context; + this.readQueue = new ArrayDeque<>(splits); + } + + @Override + public void start() { + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + final ReadPulsarSplit split = readQueue.poll(); + if (ObjectUtil.isNotNull(split)) { + logger.info("Assign split for {}, split: {}", subtaskId, split); + context.assignSplit(split, subtaskId); + } else { + logger.info("No more split for {}", subtaskId); + context.signalNoMoreSplits(subtaskId); + } + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + readQueue.addAll(splits); + } + + @Override + public void addReader(int subtaskId) { + } + + @Override + public Collection snapshotState(long checkpointId) throws Exception { + return readQueue; + } + + @Override + public void close() throws IOException { + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java new file mode 100644 index 0000000..77d304b --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java @@ -0,0 +1,152 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.pulsar; + +import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; +import java.io.Serializable; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Locale; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.internal.DefaultImplementation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author lanyuanxiaoyao + * @date 2024-01-18 + */ +public class ReadPulsarSourceReader implements SourceReader, Serializable { + private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSourceReader.class); + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS") + .withLocale(Locale.CHINA) + .withZone(ZoneId.systemDefault()); + private final Queue readQueue = new ArrayDeque<>(); + private final SourceReaderContext readerContext; + private CompletableFuture availability = new CompletableFuture<>(); + private ReadPulsarSplit currentSplit; + private boolean noMoreSplits = false; + + public ReadPulsarSourceReader(SourceReaderContext readerContext) throws PulsarClientException { + this.readerContext = readerContext; + } + + private static MessageId parseMessageId(String messageIdText) { + String[] items = messageIdText.split(":"); + return DefaultImplementation.newMessageId(Long.parseLong(items[0]), Long.parseLong(items[1]), -1); + } + + @Override + public void start() { + if (readQueue.isEmpty()) { + readerContext.sendSplitRequest(); + } + } + + private RecordView parsePulsarMessage(Message message) { + return new RecordView( + RecordView.Operation.SOURCE, + new String(message.getValue()), + FORMATTER.format(Instant.ofEpochMilli(message.getPublishTime())), + message.getMessageId().toString() + ); + } + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + if (ObjectUtil.isNotNull(currentSplit)) { + logger.info("Read split: {}", currentSplit); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(currentSplit.getPulsarUrl()) + .build()) { + try (Consumer consumer = client.newConsumer() + .topic(currentSplit.getPulsarTopic()) + .batchReceivePolicy( + BatchReceivePolicy.builder() + .timeout(1, TimeUnit.SECONDS) + .maxNumMessages(0) + .maxNumBytes(0) + .build() + ) + .receiverQueueSize(50000) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionMode(SubscriptionMode.NonDurable) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(StrUtil.format("Task_Reader_{}_{}", currentSplit.getTaskId(), readerContext.getIndexOfSubtask())) + .startMessageIdInclusive() + .subscribe()) { + consumer.seek(currentSplit.getStartTimestamp()); + Messages messages = consumer.batchReceive(); + while (ObjectUtil.isNotNull(messages)) { + long currentTimestamp = 0; + for (Message message : messages) { + currentTimestamp = message.getPublishTime(); + output.collect(parsePulsarMessage(message)); + } + consumer.acknowledge(messages); + if (currentTimestamp > currentSplit.getEndTimestamp()) { + logger.info("Break for {} -> {}", currentTimestamp, currentSplit.getEndTimestamp()); + break; + } + messages = consumer.batchReceive(); + } + } + } + } + return tryMoveToNextSplit(); + } + + private InputStatus tryMoveToNextSplit() { + currentSplit = readQueue.poll(); + logger.info("Current split: {}", currentSplit); + if (ObjectUtil.isNotNull(currentSplit)) { + return InputStatus.MORE_AVAILABLE; + } else if (noMoreSplits) { + return InputStatus.END_OF_INPUT; + } else { + if (availability.isDone()) { + availability = new CompletableFuture<>(); + } + return InputStatus.NOTHING_AVAILABLE; + } + } + + @Override + public List snapshotState(long checkpointId) { + return ListUtil.empty(); + } + + @Override + public CompletableFuture isAvailable() { + return availability; + } + + @Override + public void addSplits(List splits) { + logger.info("Add splits: {}", splits); + readQueue.addAll(splits); + availability.complete(null); + } + + @Override + public void notifyNoMoreSplits() { + logger.info("No more splits for {}", readerContext.getIndexOfSubtask()); + noMoreSplits = true; + availability.complete(null); + } + + @Override + public void close() { + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSplit.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSplit.java new file mode 100644 index 0000000..765ac02 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSplit.java @@ -0,0 +1,94 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.pulsar; + +import java.io.Serializable; +import org.apache.flink.api.connector.source.SourceSplit; + +/** + * @author lanyuanxiaoyao + * @date 2024-01-18 + */ +public class ReadPulsarSplit implements SourceSplit, Serializable { + private String taskId; + private String pulsarUrl; + private String pulsarTopic; + private String latestMessageId; + private Long startTimestamp; + private Long endTimestamp; + + public ReadPulsarSplit() { + } + + public ReadPulsarSplit(String taskId, String pulsarUrl, String pulsarTopic, String latestMessageId, Long startTimestamp, Long endTimestamp) { + this.taskId = taskId; + this.pulsarUrl = pulsarUrl; + this.pulsarTopic = pulsarTopic; + this.latestMessageId = latestMessageId; + this.startTimestamp = startTimestamp; + this.endTimestamp = endTimestamp; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public String getPulsarUrl() { + return pulsarUrl; + } + + public void setPulsarUrl(String pulsarUrl) { + this.pulsarUrl = pulsarUrl; + } + + public String getPulsarTopic() { + return pulsarTopic; + } + + public void setPulsarTopic(String pulsarTopic) { + this.pulsarTopic = pulsarTopic; + } + + public String getLatestMessageId() { + return latestMessageId; + } + + public void setLatestMessageId(String latestMessageId) { + this.latestMessageId = latestMessageId; + } + + public Long getStartTimestamp() { + return startTimestamp; + } + + public void setStartTimestamp(Long startTimestamp) { + this.startTimestamp = startTimestamp; + } + + public Long getEndTimestamp() { + return endTimestamp; + } + + public void setEndTimestamp(Long endTimestamp) { + this.endTimestamp = endTimestamp; + } + + @Override + public String splitId() { + return taskId + pulsarUrl + pulsarTopic + startTimestamp + endTimestamp + latestMessageId; + } + + @Override + public String toString() { + return "ReadPulsarSplit{" + + "taskId='" + taskId + '\'' + + ", pulsarUrl='" + pulsarUrl + '\'' + + ", pulsarTopic='" + pulsarTopic + '\'' + + ", latestMessageId='" + latestMessageId + '\'' + + ", startTimestamp=" + startTimestamp + + ", endTimestamp=" + endTimestamp + + '}'; + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarVersionedCheckpointSerializer.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarVersionedCheckpointSerializer.java new file mode 100644 index 0000000..2f7886b --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarVersionedCheckpointSerializer.java @@ -0,0 +1,33 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.pulsar; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +/** + * @author lanyuanxiaoyao + * @date 2024-01-18 + */ +public class ReadPulsarVersionedCheckpointSerializer implements SimpleVersionedSerializer>, Serializable { + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(Collection obj) throws IOException { + return mapper.writeValueAsBytes(obj); + } + + @Override + public Collection deserialize(int version, byte[] serialized) throws IOException { + return mapper.readValue(serialized, new TypeReference>() { + }); + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarVersionedSplitSerializer.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarVersionedSplitSerializer.java new file mode 100644 index 0000000..d7e3fbb --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarVersionedSplitSerializer.java @@ -0,0 +1,29 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.pulsar; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.Serializable; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +/** + * @author lanyuanxiaoyao + * @date 2024-01-18 + */ +public class ReadPulsarVersionedSplitSerializer implements SimpleVersionedSerializer, Serializable { + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(ReadPulsarSplit obj) throws IOException { + return mapper.writeValueAsBytes(obj); + } + + @Override + public ReadPulsarSplit deserialize(int version, byte[] serialized) throws IOException { + return mapper.readValue(serialized, ReadPulsarSplit.class); + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java index e74dd6a..f81713c 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java @@ -28,7 +28,7 @@ public class FlinkHelper { public static StreamExecutionEnvironment getBatchEnvironment() { StreamExecutionEnvironment environment = getSteamEnvironment(); - environment.setRuntimeMode(RuntimeExecutionMode.BATCH); + environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); return environment; } diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java index 2622278..6ed552e 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java @@ -155,6 +155,18 @@ public interface InfoService { @Get("/info/all_hdfs") ImmutableList allHdfs(@Query("key") String key); + @Get("/info/all_pulsar") + ImmutableList allPulsar(); + + @Get("/info/all_pulsar") + ImmutableList allPulsar(@Query("key") String key); + + @Get("/info/all_pulsar_topic") + ImmutableList allPulsarTopic(); + + @Get("/info/all_pulsar_topic") + ImmutableList allPulsarTopic(@Query("key") String key); + @Get("/info/simple_table_metas") ImmutableList simpleTableMetas(); diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java index b83b9e8..cb81080 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java @@ -14,10 +14,15 @@ import org.eclipse.collections.api.list.ImmutableList; @BaseRequest(baseURL = "http://service-executor-manager") public interface TaskService { @Get(value = "/task/scan", readTimeout = 2 * 60 * 1000) - String scan(@Query("hdfs") String hdfs, @Query("key") String key); - - @Get(value = "/task/scan", readTimeout = 2 * 60 * 1000) - String scan(@Query("hdfs") String hdfs, @Query("key") String key, @Query("scan_log") Boolean scanLog, @Query("scan_data") Boolean scanData); + String scan( + @Query("key") String key, + @Query("hdfs") String hdfs, + @Query("pulsar") String pulsar, + @Query("pulsar_topic") String pulsarTopic, + @Query("scan_queue") Boolean scanQueue, + @Query("scan_log") Boolean scanLog, + @Query("scan_base") Boolean scanBase + ); @Get("/task/results") ImmutableList results(@Query("task_id") String taskId); diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java index 46aa0a3..23ffcae 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java @@ -96,8 +96,24 @@ public class InfoController { @GetMapping("/all_hdfs") public ImmutableList allHdfs(@RequestParam(value = "key", required = false) String key) { return infoService.allTableInfoSearchCache() - .select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key)) + .select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key) || StrUtil.contains(cache.getHdfs(), key)) .collect(TableInfoSearchCache::getHdfs) .distinct(); } + + @GetMapping("/all_pulsar") + public ImmutableList allPulsar(@RequestParam(value = "key", required = false) String key) { + return infoService.allTableInfoSearchCache() + .select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key) || StrUtil.contains(cache.getPulsar(), key)) + .collect(TableInfoSearchCache::getPulsar) + .distinct(); + } + + @GetMapping("/all_pulsar_topic") + public ImmutableList allPulsarTopic(@RequestParam(value = "key", required = false) String key) { + return infoService.allTableInfoSearchCache() + .select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key) || StrUtil.contains(cache.getTopic(), key)) + .collect(TableInfoSearchCache::getTopic) + .distinct(); + } } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index 837913c..a6cb650 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -9,7 +9,6 @@ import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.configuration.entity.info.TableInfoSearchCache; -import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider; import java.util.List; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; @@ -180,13 +179,25 @@ public class InfoService extends BaseService { @Retryable(Throwable.class) public ImmutableList allTableInfoSearchCache() { return Lists.immutable.ofAll(mysqlJdbcTemplate.query( - SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A, TbAppCollectTableInfo.TGT_HDFS_PATH_A) + SqlBuilder.select( + TbAppCollectTableInfo.FLINK_JOB_ID_A, + TbAppCollectTableInfo.ALIAS_A, + TbAppCollectTableInfo.TGT_HDFS_PATH_A, + TbAppCollectTableInfo.SRC_PULSAR_ADDR_A, + TbAppCollectTableInfo.SRC_TOPIC_A + ) .from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_) .whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) .andEq(TbAppFlinkJobConfig.STATUS_A, "y") .andEq(TbAppCollectTableInfo.STATUS_A, "y") .build(), - (rs, row) -> new TableInfoSearchCache(rs.getLong(1), rs.getString(2), rs.getString(3)) + (rs, row) -> new TableInfoSearchCache( + rs.getLong(1), + rs.getString(2), + rs.getString(3), + rs.getString(4), + rs.getString(5) + ) )); } } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java index 6582112..6b7c0eb 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java @@ -220,4 +220,22 @@ public class TableController extends BaseController { } return AmisResponse.responseSuccess(infoService.allHdfs(key).collect(Item::new)); } + + @SuppressWarnings("DataFlowIssue") + @GetMapping("all_pulsar") + public AmisResponse> allPulsar(@RequestParam(value = "key", required = false) String key) { + if (StrUtil.isBlank(key)) { + return AmisResponse.responseSuccess(infoService.allPulsar().collect(Item::new)); + } + return AmisResponse.responseSuccess(infoService.allPulsar(key).collect(Item::new)); + } + + @SuppressWarnings("DataFlowIssue") + @GetMapping("all_pulsar_topic") + public AmisResponse> allPulsarTopic(@RequestParam(value = "key", required = false) String key) { + if (StrUtil.isBlank(key)) { + return AmisResponse.responseSuccess(infoService.allPulsarTopic().collect(Item::new)); + } + return AmisResponse.responseSuccess(infoService.allPulsarTopic(key).collect(Item::new)); + } } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java index c5d1a34..d5d4f2e 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java @@ -2,6 +2,7 @@ package com.lanyuanxiaoyao.service.web.controller; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import com.lanyuanxiaoyao.service.forest.service.PulsarService; import com.lanyuanxiaoyao.service.forest.service.TaskService; import com.lanyuanxiaoyao.service.web.controller.base.AmisMapResponse; import com.lanyuanxiaoyao.service.web.controller.base.AmisResponse; @@ -25,24 +26,35 @@ public class TaskController { private static final Logger logger = LoggerFactory.getLogger(TaskController.class); private final TaskService taskService; + private final PulsarService pulsarService; - public TaskController(TaskService taskService) { + public TaskController(TaskService taskService, PulsarService pulsarService) { this.taskService = taskService; + this.pulsarService = pulsarService; } @GetMapping("scan") public AmisResponse scan( - @RequestParam("hdfs") String hdfs, @RequestParam("key") String key, + @RequestParam(value = "hdfs", required = false) String hdfs, + @RequestParam(value = "pulsar", required = false) String pulsar, + @RequestParam(value = "topic", required = false) String topic, @RequestParam(value = "mode", defaultValue = "") String mode ) { - if (StrUtil.isBlank(hdfs) || StrUtil.isBlank(key)) { - throw new RuntimeException("Argument cannot be blank"); + if (StrUtil.isBlank(key)) { + throw new RuntimeException("Key cannot be blank"); + } + boolean scanQueue = StrUtil.contains(mode, "queue"); + boolean scanLog = StrUtil.contains(mode, "log"); + boolean scanBase = StrUtil.contains(mode, "base"); + if (scanQueue && (StrUtil.isBlank(topic) || StrUtil.isBlank(pulsar))) { + throw new RuntimeException("Pulsar topic or url cannot be empty"); + } + if ((scanLog || scanBase) && StrUtil.isBlank(hdfs)) { + throw new RuntimeException("Hdfs path cannot be empty"); } ExecutorProvider.EXECUTORS.submit(() -> { - boolean scanLog = StrUtil.contains(mode, "log"); - boolean scanData = StrUtil.contains(mode, "data"); - String applicationId = taskService.scan(hdfs, key, scanLog, scanData); + String applicationId = taskService.scan(key, hdfs, pulsar, topic, scanQueue, scanLog, scanBase); logger.info("Task: {}", applicationId); }); return AmisResponse.responseSuccess(); diff --git a/service-web/src/main/resources/static/components/task-tab.js b/service-web/src/main/resources/static/components/task-tab.js index 0531df9..f656509 100644 --- a/service-web/src/main/resources/static/components/task-tab.js +++ b/service-web/src/main/resources/static/components/task-tab.js @@ -15,8 +15,10 @@ function taskTab() { method: 'get', url: '${base}/task/scan', data: { - hdfs: '${hdfs|default:undefined}', key: '${key|default:undefined}', + hdfs: '${hdfs|default:undefined}', + pulsar: '${pulsar|default:undefined}', + topic: '${topic|default:undefined}', mode: '${scan_mode|default:undefined}', } } @@ -31,31 +33,52 @@ function taskTab() { required: true, value: 'log', options: [ + {label: '消息队列', value: 'queue'}, {label: '日志文件', value: 'log'}, - {label: '数据文件', value: 'data'}, + {label: '数据文件', value: 'base'}, ] }, + { + type: 'input-text', + name: 'key', + label: '检索字段', + required: true, + clearable: true, + description: '检索带有该字符的记录', + }, + { + type: 'input-text', + name: 'hdfs', + label: 'HDFS路经', + requiredOn: '${CONTAINS(scan_mode, \'log\') || CONTAINS(scan_mode, \'base\')}', + visibleOn: '${CONTAINS(scan_mode, \'log\') || CONTAINS(scan_mode, \'base\')}', + clearable: true, + description: '输入表HDFS路径', + autoComplete: '${base}/table/all_hdfs?key=$term', + }, { type: 'group', body: [ { type: 'input-text', - name: 'key', - label: '检索字段', - required: true, + name: 'topic', + label: 'Pulsar主题', + requiredOn: '${CONTAINS(scan_mode, \'queue\')}', + visibleOn: '${CONTAINS(scan_mode, \'queue\')}', clearable: true, - description: '检索带有该字符的记录', + description: '输入Pulsar主题', + autoComplete: '${base}/table/all_pulsar_topic?key=$term', columnRatio: 4, }, { type: 'input-text', - name: 'hdfs', - label: 'HDFS路经', - required: true, + name: 'pulsar', + label: 'Pulsar地址', + requiredOn: '${CONTAINS(scan_mode, \'queue\')}', + visibleOn: '${CONTAINS(scan_mode, \'queue\')}', clearable: true, - description: '输入表HDFS路径', - autoComplete: '${base}/table/all_hdfs?key=$term', - columnRatio: 8, + description: '输入Pulsar地址', + autoComplete: '${base}/table/all_pulsar?key=$term', }, ] }