feat(executor-task): 数据扫描增加pulsar队列读取
This commit is contained in:
@@ -77,6 +77,12 @@
|
||||
<version>10.4.0</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.pulsar</groupId>
|
||||
<artifactId>pulsar-client</artifactId>
|
||||
<version>2.8.0</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@@ -6,11 +6,13 @@ import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
||||
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
|
||||
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
|
||||
import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile;
|
||||
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSource;
|
||||
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
|
||||
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@@ -67,70 +69,92 @@ public class DataScanner {
|
||||
logger.info("Context: {}", taskContext);
|
||||
|
||||
Map<String, Object> metadata = taskContext.getMetadata();
|
||||
ArgumentsHelper.checkMetadata(taskContext, "hdfs");
|
||||
String hdfs = (String) metadata.get("hdfs");
|
||||
ArgumentsHelper.checkMetadata(taskContext, "key");
|
||||
String key = (String) metadata.get("key");
|
||||
Boolean scanLog = (Boolean) metadata.getOrDefault("scan_log", true);
|
||||
Boolean scanData = (Boolean) metadata.getOrDefault("scan_data", false);
|
||||
if (!scanLog && !scanData) {
|
||||
throw new RuntimeException("Must choose mode scan_log or scan_data");
|
||||
}
|
||||
Boolean scanQueue = (Boolean) metadata.getOrDefault("scan_queue", false);
|
||||
Boolean scanLog = (Boolean) metadata.getOrDefault("scan_log", false);
|
||||
Boolean scanBase = (Boolean) metadata.getOrDefault("scan_base", false);
|
||||
|
||||
Configuration configuration = new Configuration();
|
||||
FileSystem fileSystem = FileSystem.get(configuration);
|
||||
if (!fileSystem.exists(new Path(hdfs))) {
|
||||
throw new RuntimeException(StrUtil.format("HDFS {} is not exists", hdfs));
|
||||
if (!scanQueue && !scanLog && !scanBase) {
|
||||
throw new RuntimeException("Must choose mode scan_queue or scan_log or scan_data");
|
||||
}
|
||||
|
||||
ImmutableList<Path> paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs)))
|
||||
.reject(status -> StrUtil.equals(".hoodie", status.getPath().getName()))
|
||||
.flatCollect(status -> {
|
||||
try {
|
||||
if (status.isDirectory()) {
|
||||
return Lists.immutable.of(fileSystem.listStatus(status.getPath()));
|
||||
} else {
|
||||
return Lists.immutable.of(status);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.collect(FileStatus::getPath);
|
||||
|
||||
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
|
||||
|
||||
DataStream<RecordView> source = null;
|
||||
int totalParallelism = 20;
|
||||
if (scanLog) {
|
||||
ImmutableList<String> logPaths = paths.select(FSUtils::isLogFile).collect(Path::toString);
|
||||
int parallelism = Math.max(1, Math.min(logPaths.size() / 20, 100));
|
||||
totalParallelism = Math.max(totalParallelism, parallelism);
|
||||
source = environment
|
||||
.fromCollection(logPaths.toList())
|
||||
.name("Read log paths")
|
||||
.flatMap(new ReadHudiFile())
|
||||
.name("Read hudi file")
|
||||
.setParallelism(parallelism);
|
||||
}
|
||||
if (scanData) {
|
||||
ImmutableList<String> dataPaths = parsePaths(fileSystem, paths.select(FSUtils::isBaseFile));
|
||||
int parallelism = Math.max(1, Math.min(dataPaths.size() / 2, 500));
|
||||
totalParallelism = Math.max(totalParallelism, parallelism);
|
||||
if (scanQueue) {
|
||||
ArgumentsHelper.checkMetadata(taskContext, "pulsar");
|
||||
String pulsarUrl = (String) metadata.get("pulsar");
|
||||
ArgumentsHelper.checkMetadata(taskContext, "pulsar_topic");
|
||||
String pulsarTopic = (String) metadata.get("pulsar_topic");
|
||||
logger.info("Scan queue topic: {} url: {}", pulsarTopic, pulsarUrl);
|
||||
DataStream<RecordView> stream = environment
|
||||
.fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic, 50), WatermarkStrategy.noWatermarks(), "Read pulsar")
|
||||
.setParallelism(50)
|
||||
.disableChaining();
|
||||
if (ObjectUtil.isNull(source)) {
|
||||
source = environment
|
||||
source = stream;
|
||||
} else {
|
||||
source = source.union(stream);
|
||||
}
|
||||
}
|
||||
if (scanLog || scanBase) {
|
||||
ArgumentsHelper.checkMetadata(taskContext, "hdfs");
|
||||
String hdfs = (String) metadata.get("hdfs");
|
||||
Configuration configuration = new Configuration();
|
||||
FileSystem fileSystem = FileSystem.get(configuration);
|
||||
if (!fileSystem.exists(new Path(hdfs))) {
|
||||
throw new RuntimeException(StrUtil.format("HDFS {} is not exists", hdfs));
|
||||
}
|
||||
|
||||
ImmutableList<Path> paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs)))
|
||||
.reject(status -> StrUtil.equals(".hoodie", status.getPath().getName()))
|
||||
.flatCollect(status -> {
|
||||
try {
|
||||
if (status.isDirectory()) {
|
||||
return Lists.immutable.of(fileSystem.listStatus(status.getPath()));
|
||||
} else {
|
||||
return Lists.immutable.of(status);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.collect(FileStatus::getPath);
|
||||
if (scanLog) {
|
||||
logger.info("Scan log hdfs: {}", hdfs);
|
||||
ImmutableList<String> logPaths = paths.select(FSUtils::isLogFile).collect(Path::toString);
|
||||
int parallelism = Math.max(1, Math.min(logPaths.size() / 20, 100));
|
||||
totalParallelism = Math.max(totalParallelism, parallelism);
|
||||
DataStream<RecordView> stream = environment
|
||||
.fromCollection(logPaths.toList())
|
||||
.name("Read log paths")
|
||||
.flatMap(new ReadHudiFile())
|
||||
.name("Read hudi file")
|
||||
.setParallelism(parallelism);
|
||||
if (ObjectUtil.isNull(source)) {
|
||||
source = stream;
|
||||
} else {
|
||||
source = source.union(stream);
|
||||
}
|
||||
}
|
||||
if (scanBase) {
|
||||
logger.info("Scan base hdfs: {}", hdfs);
|
||||
ImmutableList<String> dataPaths = parsePaths(fileSystem, paths.select(FSUtils::isBaseFile));
|
||||
int parallelism = Math.max(1, Math.min(dataPaths.size() / 2, 500));
|
||||
totalParallelism = Math.max(totalParallelism, parallelism);
|
||||
DataStream<RecordView> stream = environment
|
||||
.fromCollection(dataPaths.toList())
|
||||
.name("Read base paths")
|
||||
.flatMap(new ReadHudiFile())
|
||||
.name("Read hudi file")
|
||||
.setParallelism(parallelism);
|
||||
} else {
|
||||
source = source.union(environment
|
||||
.fromCollection(dataPaths.toList())
|
||||
.name("Read base paths")
|
||||
.flatMap(new ReadHudiFile())
|
||||
.name("Read hudi file")
|
||||
.setParallelism(parallelism));
|
||||
if (ObjectUtil.isNull(source)) {
|
||||
source = stream;
|
||||
} else {
|
||||
source = source.union(stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ObjectUtil.isNull(source)) {
|
||||
@@ -147,6 +171,6 @@ public class DataScanner {
|
||||
.sinkTo(FlinkHelper.createFileSink(taskContext))
|
||||
.setParallelism(10)
|
||||
.name("Output results");
|
||||
environment.execute(StrUtil.format("Search {} in {}", key, hdfs));
|
||||
environment.execute();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,13 @@ import java.util.Map;
|
||||
* @date 2024-01-09
|
||||
*/
|
||||
public class RecordView implements Serializable, Comparable<RecordView> {
|
||||
private final Operation operation;
|
||||
private final String data;
|
||||
private final String timestamp;
|
||||
private final String file;
|
||||
private final Map<String, Object> attributes;
|
||||
private Operation operation;
|
||||
private String data;
|
||||
private String timestamp;
|
||||
private String file;
|
||||
private Map<String, Object> attributes;
|
||||
|
||||
public RecordView() {}
|
||||
|
||||
public RecordView(Operation operation, String data, String timestamp, String file) {
|
||||
this.operation = operation;
|
||||
@@ -30,22 +32,42 @@ public class RecordView implements Serializable, Comparable<RecordView> {
|
||||
return operation;
|
||||
}
|
||||
|
||||
public void setOperation(Operation operation) {
|
||||
this.operation = operation;
|
||||
}
|
||||
|
||||
public String getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public void setData(String data) {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public String getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public void setTimestamp(String timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
public String getFile() {
|
||||
return file;
|
||||
}
|
||||
|
||||
public void setFile(String file) {
|
||||
this.file = file;
|
||||
}
|
||||
|
||||
public Map<String, Object> getAttributes() {
|
||||
return attributes;
|
||||
}
|
||||
|
||||
public void setAttributes(Map<String, Object> attributes) {
|
||||
this.attributes = attributes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return StrUtil.format("{} {} {} {}", operation, timestamp, file, data);
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
|
||||
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
|
||||
import java.io.Serializable;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.api.connector.source.*;
|
||||
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
|
||||
import org.apache.flink.core.io.SimpleVersionedSerializer;
|
||||
import org.apache.pulsar.client.api.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-18
|
||||
*/
|
||||
public class ReadPulsarSource implements Source<RecordView, ReadPulsarSplit, Collection<ReadPulsarSplit>>, ResultTypeQueryable<RecordView>, Serializable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSource.class);
|
||||
private static final Long TASK_GAP = 6 * 60 * 60 * 1000L;
|
||||
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
|
||||
.withLocale(Locale.CHINA)
|
||||
.withZone(ZoneId.systemDefault());
|
||||
private final Collection<ReadPulsarSplit> splits;
|
||||
|
||||
public ReadPulsarSource(TaskContext taskContext, String pulsarUrl, String pulsarTopic, Integer parallelism) throws PulsarClientException {
|
||||
try (PulsarClient client = PulsarClient.builder()
|
||||
.serviceUrl(pulsarUrl)
|
||||
.build()) {
|
||||
try (Consumer<byte[]> consumer = client.newConsumer()
|
||||
.topic(pulsarTopic)
|
||||
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
|
||||
.subscriptionMode(SubscriptionMode.NonDurable)
|
||||
.subscriptionType(SubscriptionType.Exclusive)
|
||||
.subscriptionName(StrUtil.format("Task_Reader_Detect_{}", taskContext.getTaskId()))
|
||||
.startMessageIdInclusive()
|
||||
.subscribe()) {
|
||||
MessageId latestMessageId = consumer.getLastMessageId();
|
||||
Message<byte[]> message = consumer.receive();
|
||||
long startTimestamp = message.getPublishTime();
|
||||
long endTimestamp = Instant.now().toEpochMilli();
|
||||
long gap = Math.max((endTimestamp - startTimestamp) / (parallelism - 1), 1000 * 60 * 60);
|
||||
logger.info("Gap: {}, Parallelism: {}", gap, parallelism);
|
||||
List<ReadPulsarSplit> tasks = new ArrayList<>();
|
||||
while (startTimestamp < endTimestamp) {
|
||||
tasks.add(new ReadPulsarSplit(
|
||||
taskContext.getTaskId(),
|
||||
pulsarUrl,
|
||||
pulsarTopic,
|
||||
latestMessageId.toString(),
|
||||
startTimestamp,
|
||||
startTimestamp + gap
|
||||
));
|
||||
startTimestamp += gap;
|
||||
}
|
||||
splits = tasks;
|
||||
for (ReadPulsarSplit split : splits) {
|
||||
logger.info("Read split: {} -> {}", covertTimestamp(split.getStartTimestamp()), covertTimestamp(split.getEndTimestamp()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static String covertTimestamp(Long timestamp) {
|
||||
return FORMATTER.format(Instant.ofEpochMilli(timestamp));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boundedness getBoundedness() {
|
||||
return Boundedness.BOUNDED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SourceReader<RecordView, ReadPulsarSplit> createReader(SourceReaderContext readerContext) throws PulsarClientException {
|
||||
return new ReadPulsarSourceReader(readerContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SplitEnumerator<ReadPulsarSplit, Collection<ReadPulsarSplit>> createEnumerator(SplitEnumeratorContext<ReadPulsarSplit> enumContext) throws Exception {
|
||||
return new ReadPulsarSourceEnumerator(enumContext, splits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SplitEnumerator<ReadPulsarSplit, Collection<ReadPulsarSplit>> restoreEnumerator(SplitEnumeratorContext<ReadPulsarSplit> enumContext, Collection<ReadPulsarSplit> checkpoint) throws Exception {
|
||||
return new ReadPulsarSourceEnumerator(enumContext, checkpoint);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleVersionedSerializer<ReadPulsarSplit> getSplitSerializer() {
|
||||
return new ReadPulsarVersionedSplitSerializer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleVersionedSerializer<Collection<ReadPulsarSplit>> getEnumeratorCheckpointSerializer() {
|
||||
return new ReadPulsarVersionedCheckpointSerializer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TypeInformation<RecordView> getProducedType() {
|
||||
return TypeInformation.of(RecordView.class);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.flink.api.connector.source.SplitEnumerator;
|
||||
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-18
|
||||
*/
|
||||
public class ReadPulsarSourceEnumerator implements SplitEnumerator<ReadPulsarSplit, Collection<ReadPulsarSplit>>, Serializable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSourceEnumerator.class);
|
||||
private final SplitEnumeratorContext<ReadPulsarSplit> context;
|
||||
private final Queue<ReadPulsarSplit> readQueue;
|
||||
|
||||
public ReadPulsarSourceEnumerator(SplitEnumeratorContext<ReadPulsarSplit> context, Collection<ReadPulsarSplit> splits) {
|
||||
this.context = context;
|
||||
this.readQueue = new ArrayDeque<>(splits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
|
||||
final ReadPulsarSplit split = readQueue.poll();
|
||||
if (ObjectUtil.isNotNull(split)) {
|
||||
logger.info("Assign split for {}, split: {}", subtaskId, split);
|
||||
context.assignSplit(split, subtaskId);
|
||||
} else {
|
||||
logger.info("No more split for {}", subtaskId);
|
||||
context.signalNoMoreSplits(subtaskId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addSplitsBack(List<ReadPulsarSplit> splits, int subtaskId) {
|
||||
readQueue.addAll(splits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addReader(int subtaskId) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<ReadPulsarSplit> snapshotState(long checkpointId) throws Exception {
|
||||
return readQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar;
|
||||
|
||||
import cn.hutool.core.collection.ListUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
|
||||
import java.io.Serializable;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.flink.api.connector.source.ReaderOutput;
|
||||
import org.apache.flink.api.connector.source.SourceReader;
|
||||
import org.apache.flink.api.connector.source.SourceReaderContext;
|
||||
import org.apache.flink.core.io.InputStatus;
|
||||
import org.apache.pulsar.client.api.*;
|
||||
import org.apache.pulsar.client.internal.DefaultImplementation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-18
|
||||
*/
|
||||
public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPulsarSplit>, Serializable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSourceReader.class);
|
||||
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
|
||||
.withLocale(Locale.CHINA)
|
||||
.withZone(ZoneId.systemDefault());
|
||||
private final Queue<ReadPulsarSplit> readQueue = new ArrayDeque<>();
|
||||
private final SourceReaderContext readerContext;
|
||||
private CompletableFuture<Void> availability = new CompletableFuture<>();
|
||||
private ReadPulsarSplit currentSplit;
|
||||
private boolean noMoreSplits = false;
|
||||
|
||||
public ReadPulsarSourceReader(SourceReaderContext readerContext) throws PulsarClientException {
|
||||
this.readerContext = readerContext;
|
||||
}
|
||||
|
||||
private static MessageId parseMessageId(String messageIdText) {
|
||||
String[] items = messageIdText.split(":");
|
||||
return DefaultImplementation.newMessageId(Long.parseLong(items[0]), Long.parseLong(items[1]), -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (readQueue.isEmpty()) {
|
||||
readerContext.sendSplitRequest();
|
||||
}
|
||||
}
|
||||
|
||||
private RecordView parsePulsarMessage(Message<byte[]> message) {
|
||||
return new RecordView(
|
||||
RecordView.Operation.SOURCE,
|
||||
new String(message.getValue()),
|
||||
FORMATTER.format(Instant.ofEpochMilli(message.getPublishTime())),
|
||||
message.getMessageId().toString()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStatus pollNext(ReaderOutput<RecordView> output) throws Exception {
|
||||
if (ObjectUtil.isNotNull(currentSplit)) {
|
||||
logger.info("Read split: {}", currentSplit);
|
||||
try (PulsarClient client = PulsarClient.builder()
|
||||
.serviceUrl(currentSplit.getPulsarUrl())
|
||||
.build()) {
|
||||
try (Consumer<byte[]> consumer = client.newConsumer()
|
||||
.topic(currentSplit.getPulsarTopic())
|
||||
.batchReceivePolicy(
|
||||
BatchReceivePolicy.builder()
|
||||
.timeout(1, TimeUnit.SECONDS)
|
||||
.maxNumMessages(0)
|
||||
.maxNumBytes(0)
|
||||
.build()
|
||||
)
|
||||
.receiverQueueSize(50000)
|
||||
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
|
||||
.subscriptionMode(SubscriptionMode.NonDurable)
|
||||
.subscriptionType(SubscriptionType.Exclusive)
|
||||
.subscriptionName(StrUtil.format("Task_Reader_{}_{}", currentSplit.getTaskId(), readerContext.getIndexOfSubtask()))
|
||||
.startMessageIdInclusive()
|
||||
.subscribe()) {
|
||||
consumer.seek(currentSplit.getStartTimestamp());
|
||||
Messages<byte[]> messages = consumer.batchReceive();
|
||||
while (ObjectUtil.isNotNull(messages)) {
|
||||
long currentTimestamp = 0;
|
||||
for (Message<byte[]> message : messages) {
|
||||
currentTimestamp = message.getPublishTime();
|
||||
output.collect(parsePulsarMessage(message));
|
||||
}
|
||||
consumer.acknowledge(messages);
|
||||
if (currentTimestamp > currentSplit.getEndTimestamp()) {
|
||||
logger.info("Break for {} -> {}", currentTimestamp, currentSplit.getEndTimestamp());
|
||||
break;
|
||||
}
|
||||
messages = consumer.batchReceive();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return tryMoveToNextSplit();
|
||||
}
|
||||
|
||||
private InputStatus tryMoveToNextSplit() {
|
||||
currentSplit = readQueue.poll();
|
||||
logger.info("Current split: {}", currentSplit);
|
||||
if (ObjectUtil.isNotNull(currentSplit)) {
|
||||
return InputStatus.MORE_AVAILABLE;
|
||||
} else if (noMoreSplits) {
|
||||
return InputStatus.END_OF_INPUT;
|
||||
} else {
|
||||
if (availability.isDone()) {
|
||||
availability = new CompletableFuture<>();
|
||||
}
|
||||
return InputStatus.NOTHING_AVAILABLE;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ReadPulsarSplit> snapshotState(long checkpointId) {
|
||||
return ListUtil.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> isAvailable() {
|
||||
return availability;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addSplits(List<ReadPulsarSplit> splits) {
|
||||
logger.info("Add splits: {}", splits);
|
||||
readQueue.addAll(splits);
|
||||
availability.complete(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyNoMoreSplits() {
|
||||
logger.info("No more splits for {}", readerContext.getIndexOfSubtask());
|
||||
noMoreSplits = true;
|
||||
availability.complete(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar;
|
||||
|
||||
import java.io.Serializable;
|
||||
import org.apache.flink.api.connector.source.SourceSplit;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-18
|
||||
*/
|
||||
public class ReadPulsarSplit implements SourceSplit, Serializable {
|
||||
private String taskId;
|
||||
private String pulsarUrl;
|
||||
private String pulsarTopic;
|
||||
private String latestMessageId;
|
||||
private Long startTimestamp;
|
||||
private Long endTimestamp;
|
||||
|
||||
public ReadPulsarSplit() {
|
||||
}
|
||||
|
||||
public ReadPulsarSplit(String taskId, String pulsarUrl, String pulsarTopic, String latestMessageId, Long startTimestamp, Long endTimestamp) {
|
||||
this.taskId = taskId;
|
||||
this.pulsarUrl = pulsarUrl;
|
||||
this.pulsarTopic = pulsarTopic;
|
||||
this.latestMessageId = latestMessageId;
|
||||
this.startTimestamp = startTimestamp;
|
||||
this.endTimestamp = endTimestamp;
|
||||
}
|
||||
|
||||
public String getTaskId() {
|
||||
return taskId;
|
||||
}
|
||||
|
||||
public void setTaskId(String taskId) {
|
||||
this.taskId = taskId;
|
||||
}
|
||||
|
||||
public String getPulsarUrl() {
|
||||
return pulsarUrl;
|
||||
}
|
||||
|
||||
public void setPulsarUrl(String pulsarUrl) {
|
||||
this.pulsarUrl = pulsarUrl;
|
||||
}
|
||||
|
||||
public String getPulsarTopic() {
|
||||
return pulsarTopic;
|
||||
}
|
||||
|
||||
public void setPulsarTopic(String pulsarTopic) {
|
||||
this.pulsarTopic = pulsarTopic;
|
||||
}
|
||||
|
||||
public String getLatestMessageId() {
|
||||
return latestMessageId;
|
||||
}
|
||||
|
||||
public void setLatestMessageId(String latestMessageId) {
|
||||
this.latestMessageId = latestMessageId;
|
||||
}
|
||||
|
||||
public Long getStartTimestamp() {
|
||||
return startTimestamp;
|
||||
}
|
||||
|
||||
public void setStartTimestamp(Long startTimestamp) {
|
||||
this.startTimestamp = startTimestamp;
|
||||
}
|
||||
|
||||
public Long getEndTimestamp() {
|
||||
return endTimestamp;
|
||||
}
|
||||
|
||||
public void setEndTimestamp(Long endTimestamp) {
|
||||
this.endTimestamp = endTimestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String splitId() {
|
||||
return taskId + pulsarUrl + pulsarTopic + startTimestamp + endTimestamp + latestMessageId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ReadPulsarSplit{" +
|
||||
"taskId='" + taskId + '\'' +
|
||||
", pulsarUrl='" + pulsarUrl + '\'' +
|
||||
", pulsarTopic='" + pulsarTopic + '\'' +
|
||||
", latestMessageId='" + latestMessageId + '\'' +
|
||||
", startTimestamp=" + startTimestamp +
|
||||
", endTimestamp=" + endTimestamp +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import org.apache.flink.core.io.SimpleVersionedSerializer;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-18
|
||||
*/
|
||||
public class ReadPulsarVersionedCheckpointSerializer implements SimpleVersionedSerializer<Collection<ReadPulsarSplit>>, Serializable {
|
||||
private final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
@Override
|
||||
public int getVersion() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(Collection<ReadPulsarSplit> obj) throws IOException {
|
||||
return mapper.writeValueAsBytes(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<ReadPulsarSplit> deserialize(int version, byte[] serialized) throws IOException {
|
||||
return mapper.readValue(serialized, new TypeReference<List<ReadPulsarSplit>>() {
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import org.apache.flink.core.io.SimpleVersionedSerializer;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-18
|
||||
*/
|
||||
public class ReadPulsarVersionedSplitSerializer implements SimpleVersionedSerializer<ReadPulsarSplit>, Serializable {
|
||||
private final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
@Override
|
||||
public int getVersion() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(ReadPulsarSplit obj) throws IOException {
|
||||
return mapper.writeValueAsBytes(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadPulsarSplit deserialize(int version, byte[] serialized) throws IOException {
|
||||
return mapper.readValue(serialized, ReadPulsarSplit.class);
|
||||
}
|
||||
}
|
||||
@@ -28,7 +28,7 @@ public class FlinkHelper {
|
||||
|
||||
public static StreamExecutionEnvironment getBatchEnvironment() {
|
||||
StreamExecutionEnvironment environment = getSteamEnvironment();
|
||||
environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
|
||||
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
|
||||
return environment;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user