refactor(executor-task): 恢复pulsar读取策略

This commit is contained in:
v-zhangjc9
2024-05-30 15:09:54 +08:00
parent 9a6ed7e700
commit 633db5512d
10 changed files with 276 additions and 243 deletions

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
@@ -53,11 +54,13 @@ public class ReadPulsarSource implements Source<RecordView, ReadPulsarSplit, Col
ImmutableList<ReadPulsarSplit> tasks = TimeRangeHelper.range(startTimestamp, endTimestamp, TASK_GAP)
.collect(range -> new ReadPulsarSplit(
taskContext.getTaskId(),
IdUtil.nanoId(10),
pulsarUrl,
pulsarTopic,
latestMessageId.toString(),
range.getStart(),
range.getEnd()
range.getEnd(),
TASK_GAP
));
logger.info("Gap: {}, Splits: {}", TASK_GAP, tasks.size());
for (ReadPulsarSplit split : tasks) {
@@ -78,12 +81,12 @@ public class ReadPulsarSource implements Source<RecordView, ReadPulsarSplit, Col
}
@Override
public SourceReader<RecordView, ReadPulsarSplit> createReader(SourceReaderContext readerContext) throws PulsarClientException {
public SourceReader<RecordView, ReadPulsarSplit> createReader(SourceReaderContext readerContext) {
return new ReadPulsarSourceReader(readerContext);
}
@Override
public SplitEnumerator<ReadPulsarSplit, Collection<ReadPulsarSplit>> createEnumerator(SplitEnumeratorContext<ReadPulsarSplit> enumContext) throws Exception {
public SplitEnumerator<ReadPulsarSplit, Collection<ReadPulsarSplit>> createEnumerator(SplitEnumeratorContext<ReadPulsarSplit> enumContext) {
return new ReadPulsarSourceEnumerator(enumContext, splits);
}

View File

@@ -7,11 +7,7 @@ import java.util.ArrayDeque;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.FinishSplitEvent;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.slf4j.Logger;
@@ -25,30 +21,21 @@ public class ReadPulsarSourceEnumerator implements SplitEnumerator<ReadPulsarSpl
private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSourceEnumerator.class);
private final SplitEnumeratorContext<ReadPulsarSplit> context;
private final Queue<ReadPulsarSplit> readQueue;
private final AtomicInteger success = new AtomicInteger(0);
public ReadPulsarSourceEnumerator(SplitEnumeratorContext<ReadPulsarSplit> context, Collection<ReadPulsarSplit> splits) {
this.context = context;
this.readQueue = new ArrayDeque<>(splits);
this.success.set(splits.size());
}
@Override
public void start() {
}
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof FinishSplitEvent) {
logger.info("{}", success.decrementAndGet());
}
}
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
final ReadPulsarSplit split = readQueue.poll();
if (ObjectUtil.isNotNull(split)) {
logger.info("t{} Assign split for {}, Queue rest: {}, Success: {}", subtaskId, subtaskId, readQueue.size(), success.get());
logger.info("t{} Assign split for {}, Queue rest: {}", subtaskId, subtaskId, readQueue.size());
context.assignSplit(split, subtaskId);
} else {
logger.info("t{} No more split for {}", subtaskId, subtaskId);

View File

@@ -15,15 +15,18 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.FinishSplitEvent;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,20 +45,12 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
private ReadPulsarSplit currentSplit;
private boolean noMoreSplits = false;
public ReadPulsarSourceReader(SourceReaderContext readerContext) throws PulsarClientException {
public ReadPulsarSourceReader(SourceReaderContext readerContext) {
this.readerContext = readerContext;
}
private static MessageId parseMessageId(String messageIdText) {
String[] items = messageIdText.split(":");
return DefaultImplementation.newMessageId(Long.parseLong(items[0]), Long.parseLong(items[1]), -1);
}
@Override
public void start() {
if (readQueue.isEmpty()) {
readerContext.sendSplitRequest();
}
}
private RecordView parsePulsarMessage(Message<String> message) {
@@ -71,7 +66,7 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
public InputStatus pollNext(ReaderOutput<RecordView> output) throws Exception {
logger.info("t{} Poll Next", readerContext.getIndexOfSubtask());
if (ObjectUtil.isNotNull(currentSplit)) {
logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getStartTimestamp());
logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getSplitId());
long startTimestamp = currentSplit.getStartTimestamp();
long endTimestamp = currentSplit.getEndTimestamp();
try (PulsarClient client = PulsarClient.builder()
@@ -106,7 +101,6 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
}
currentSplit = null;
readerContext.sendSourceEventToCoordinator(new FinishSplitEvent());
if (ObjectUtil.isEmpty(readQueue) && !noMoreSplits) {
readerContext.sendSplitRequest();
logger.info("t{} Request new split", readerContext.getIndexOfSubtask());
@@ -138,14 +132,14 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
@Override
public void addSplits(List<ReadPulsarSplit> splits) {
logger.info("t{} Add splits: {}", readerContext.getIndexOfSubtask(), splits.stream().map(ReadPulsarSplit::getStartTimestamp).collect(Collectors.toList()));
logger.info("t{} Receive add splits: {}", readerContext.getIndexOfSubtask(), splits.stream().map(ReadPulsarSplit::getSplitId).collect(Collectors.toList()));
readQueue.addAll(splits);
availability.complete(null);
}
@Override
public void notifyNoMoreSplits() {
logger.info("t{} No more splits for {}", readerContext.getIndexOfSubtask(), readerContext.getIndexOfSubtask());
logger.info("t{} Receive no more splits", readerContext.getIndexOfSubtask());
noMoreSplits = true;
availability.complete(null);
}

View File

@@ -9,22 +9,26 @@ import org.apache.flink.api.connector.source.SourceSplit;
*/
public class ReadPulsarSplit implements SourceSplit, Serializable {
private String taskId;
private String splitId;
private String pulsarUrl;
private String pulsarTopic;
private String latestMessageId;
private Long startTimestamp;
private Long endTimestamp;
private Long gap;
public ReadPulsarSplit() {
}
public ReadPulsarSplit(String taskId, String pulsarUrl, String pulsarTopic, String latestMessageId, Long startTimestamp, Long endTimestamp) {
public ReadPulsarSplit(String taskId, String splitId, String pulsarUrl, String pulsarTopic, String latestMessageId, Long startTimestamp, Long endTimestamp, Long gap) {
this.taskId = taskId;
this.splitId = splitId;
this.pulsarUrl = pulsarUrl;
this.pulsarTopic = pulsarTopic;
this.latestMessageId = latestMessageId;
this.startTimestamp = startTimestamp;
this.endTimestamp = endTimestamp;
this.gap = gap;
}
public String getTaskId() {
@@ -35,6 +39,14 @@ public class ReadPulsarSplit implements SourceSplit, Serializable {
this.taskId = taskId;
}
public String getSplitId() {
return splitId;
}
public void setSplitId(String splitId) {
this.splitId = splitId;
}
public String getPulsarUrl() {
return pulsarUrl;
}
@@ -75,20 +87,30 @@ public class ReadPulsarSplit implements SourceSplit, Serializable {
this.endTimestamp = endTimestamp;
}
public Long getGap() {
return gap;
}
public void setGap(Long gap) {
this.gap = gap;
}
@Override
public String splitId() {
return taskId + pulsarUrl + pulsarTopic + startTimestamp + endTimestamp + latestMessageId;
return taskId + splitId + pulsarUrl + pulsarTopic + startTimestamp + endTimestamp + latestMessageId;
}
@Override
public String toString() {
return "ReadPulsarSplit{" +
"taskId='" + taskId + '\'' +
", splitId='" + splitId + '\'' +
", pulsarUrl='" + pulsarUrl + '\'' +
", pulsarTopic='" + pulsarTopic + '\'' +
", latestMessageId='" + latestMessageId + '\'' +
", startTimestamp=" + startTimestamp +
", endTimestamp=" + endTimestamp +
", gap=" + gap +
'}';
}
}

View File

@@ -1,27 +0,0 @@
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSplit;
import org.apache.flink.api.connector.source.SourceEvent;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @author lanyuanxiaoyao
*/
public class AddSplitEvent implements SourceEvent {
private final ImmutableList<ReadPulsarSplit> splits;
public AddSplitEvent(ImmutableList<ReadPulsarSplit> splits) {
this.splits = splits;
}
public ImmutableList<ReadPulsarSplit> getSplits() {
return splits;
}
@Override
public String toString() {
return "AddSplitEvent{" +
"splits=" + splits +
'}';
}
}

View File

@@ -1,9 +0,0 @@
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event;
import org.apache.flink.api.connector.source.SourceEvent;
/**
* @author lanyuanxiaoyao
*/
public class FinishSplitEvent implements SourceEvent {
}

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.executor.task.helper;
import java.util.concurrent.TimeUnit;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList;
@@ -27,6 +28,7 @@ public class TimeRangeHelper {
}
public static ImmutableList<TimeRange> range(long start, long end, long gap) {
gap = Math.max(TimeUnit.MINUTES.toMillis(1), gap);
MutableList<TimeRange> ranges = Lists.mutable.empty();
while (start <= end) {
ranges.add(new TimeRange(start, Math.min(end, start + gap)));

View File

@@ -0,0 +1,21 @@
package com.lanyuanxiaoyao.service.executor.task;
import com.lanyuanxiaoyao.service.executor.task.helper.TimeRangeHelper;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
/**
* @author lanyuanxiaoyao
*/
public class TimeRangeTest {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withLocale(Locale.CHINA)
.withZone(ZoneId.systemDefault());
public static void main(String[] args) {
TimeRangeHelper.range(1716912000000L, Instant.now().toEpochMilli(), TimeUnit.MINUTES.toMillis(30))
.forEach(range -> System.out.printf("%s - %s\n", FORMATTER.format(Instant.ofEpochMilli(range.getStart())), FORMATTER.format(Instant.ofEpochMilli(range.getEnd()))));
}
}